akka - 如何使用 Akka BoundedMailBox 来限制生产者

标签 akka backpressure

我有两个参与者,一个正在生成消息,另一个正在以固定速率使用消息。

是否有可能让生产者受到消费者 BoundedMailBox 的限制? (背压)

我的生产者目前是定期安排的(向它发送一个滴答消息),有没有办法让它根据消费者邮箱中的可用性来安排?

我使用的是即发即忘式 (consumer.tell()),因为我不需要响应。我应该使用不同的消息发送方法吗?

最佳答案

只需指定一个邮箱限制,如果邮箱已满,它就会出现阻塞。 我自己还没有尝试过,但是这个线程中的人正在查看行为,并且都发现一旦邮箱达到极限, Actor 就会阻塞。

有关讨论和更多此类性质的测试,请参见此处。 https://groups.google.com/forum/?fromgroups=#!topic/akka-user/e0tebq5V4nM

来自那个线程:

object ProducerConsumer extends App {

  implicit val system = ActorSystem("ProducerConsumer")

  def waitFor(actor: ActorRef) {
    Await.ready(gracefulStop(actor, 5.seconds), 5.seconds)
  }

  val consumers = system.actorOf(Props[Consumer].
    withRouter(RoundRobinRouter(4)).
    withDispatcher("consumer-dispatcher"), "consumer")

  for (work <- generateWork)
    consumers ! work

  consumers ! PoisonPill
  waitFor(consumers)
  system.shutdown
}

应用程序配置文件:

consumer-dispatcher {
  type = BalancingDispatcher
  mailbox-capacity = 100
}

关于akka - 如何使用 Akka BoundedMailBox 来限制生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16500352/

相关文章:

scala - 为什么抽象覆盖失败

java - 如何在 Spring WebFlux 中配置背压?

scala - Akka Streams 的 ActorPublisher 作为 Web 响应的源 - 背压如何工作

javascript - RxJs:zip 运算符的有损形式

scala - Akka HTTP 背压连接

scala - 在 Actors 外部登录 Akka TestKit

scala - 无法部署本地 Spark 作业,worker 因 EndPointAssociationError 失败

scala - Akka:什么时候可以安全地发送消息

java - Apache Flink 作业集群 rpc.address 绑定(bind)到 kubernetes 上的本地主机

scala - 与 Akka Streams 同步反馈