scala - 需要帮助实现阻止有界邮箱

标签 scala akka

我意识到在响应式(Reactive)编程中阻止是不受欢迎的……但是,我有一个简单的用例。我有一个生产者,其生产速度必须比消费者池快。对于这个现实世界的例子,我估计我只需要一个生产者,也许需要 4 到 10 个消费者(最多)。

我想使用有界邮箱,这样我的(快速)生产者就不会远远领先于(慢速)消费者。在我之前的(非 Akka)实现中,我使用了 Java BlockingQueue。

case object Start
case object End
case class Work(msg: String)

trait Constant {
  val loop = 1000000
}


class SlowConsumerActor extends Actor with RequiresMessageQueue[BoundedMessageQueueSemantics] with Constant with ActorLogging  {
  log.info("created SlowConsumerActor")
  override def receive: Receive = {
    case Work(msg) =>
      log.info("working on " + msg)
      Thread.sleep(5000)
  }
}

class FastProducerActor extends Actor with Constant with ActorLogging {
  log.info("Created FastProducerActor")
  val slowConsumerActor = context.actorOf(FromConfig.props(Props[SlowConsumerActor]), "slowConsumerRouter")
  def doWork = {
    for (i <- 1 until 10) {
      val msg = "work" + i
      log.info("Sending " + msg)
      slowConsumerActor ! Work(msg)
      Thread.sleep(1000)
    }
  }
  override def receive: Receive = {
    case Start =>
      log.info("Got start message in FastProducerActor")
      doWork
      log.info("Sent all messages to Slow Consumer")
  }
}

object BlockingBoundedMailbox extends App {
  val sys = ActorSystem("blocking-bounded-mailbox-example")
  val fastProducerActor = sys.actorOf(Props[FastProducerActor], "producer")
  fastProducerActor ! Start
}

application.conf 是

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-push-timeout-time = 1
  mailbox-capacity = 2
}

akka {
  loglevel = "INFO"
  stdout-loglevel = "INFO"
  actor {
      deployment {
        /producer/slowConsumerRouter {
            router = round-robin-pool
            nr-of-instances = 1
        }
      }

      mailbox.requirements {
        "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
      }

      default-dispatcher {
        type = "Dispatcher"
        executor = "thread-pool-executor"

      }
  }
}

示例结果为

[INFO] [09/01/2019 14:44:58.497] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Created FastProducerActor
[WARN] [09/01/2019 14:44:58.504] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [mailboxes] Configured potentially-blocking mailbox [bounded-mailbox] configured with non-zero pushTimeOut (1000000 nanoseconds), which can lead to blocking behavior when sending messages to this mailbox. Avoid this by setting `bounded-mailbox.mailbox-push-timeout-time` to `0`.
[INFO] [09/01/2019 14:44:58.511] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-21] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] created SlowConsumerActor
[INFO] [09/01/2019 14:44:58.513] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Got start message in FastProducerActor
[INFO] [09/01/2019 14:44:58.515] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work1
[INFO] [09/01/2019 14:44:58.518] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work1
[INFO] [09/01/2019 14:44:59.520] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work2
[INFO] [09/01/2019 14:45:00.524] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work3
[INFO] [09/01/2019 14:45:01.529] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work4
[INFO] [09/01/2019 14:45:01.534] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-6] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:02.536] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work5
[INFO] [09/01/2019 14:45:02.538] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-10] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:03.520] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work2
[INFO] [09/01/2019 14:45:03.541] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work6
[INFO] [09/01/2019 14:45:04.546] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work7
[INFO] [09/01/2019 14:45:04.548] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-16] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:05.552] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work8
[INFO] [09/01/2019 14:45:05.553] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-18] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [4] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:06.556] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work9
[INFO] [09/01/2019 14:45:06.558] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-21] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [5] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:07.561] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sent all messages to Slow Consumer
[INFO] [09/01/2019 14:45:08.524] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work3
[INFO] [09/01/2019 14:45:13.527] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work6

关键的事情——

  • 如果绑定(bind)邮箱已满,我确实希望我的 Producer 放慢速度或阻止
  • 我不希望我的消息进入死信队列

顺便说一句,一旦我让程序正常工作,我会将邮箱容量更改为更大的值,例如 10000,并将实例数更改为更大的数字,或者使用 reSizer 动态添加更多消费者。

在这个长周末的任何帮助或建议将不胜感激。

最佳答案

找到了答案——而且非常简单。我将邮箱推送超时时间设置得太低。因此,超时后,消息将被发送到死信邮箱。我将邮箱推送超时时间更改为一个更大的数字,这有效地使单线程生产者等待,这正是我想要的。

关于scala - 需要帮助实现阻止有界邮箱,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57749760/

相关文章:

scala - toSet 和类型推断

scala - 喷雾路由模板不起作用

java - 访问空数组或空数组时出现 Spark 错误

斯卡拉 + Akka : How to develop a Multi-Machine Highly Available Cluster

configuration - Akka::dispatcher [%name%] 未配置,使用默认调度程序

scala - 单例集群参与者未启动

scala - Lightbend Lagom 和 Akka : Unable to hit rest endpoint of lagom services

scala - 如何使用sbt docker插件复制资源文件

java - JVM 任意精度库

scala - 使用 Spray 限制并发请求