我意识到在响应式(Reactive)编程中阻止是不受欢迎的……但是,我有一个简单的用例。我有一个生产者,其生产速度必须比消费者池快。对于这个现实世界的例子,我估计我只需要一个生产者,也许需要 4 到 10 个消费者(最多)。
我想使用有界邮箱,这样我的(快速)生产者就不会远远领先于(慢速)消费者。在我之前的(非 Akka)实现中,我使用了 Java BlockingQueue。
case object Start
case object End
case class Work(msg: String)
trait Constant {
val loop = 1000000
}
class SlowConsumerActor extends Actor with RequiresMessageQueue[BoundedMessageQueueSemantics] with Constant with ActorLogging {
log.info("created SlowConsumerActor")
override def receive: Receive = {
case Work(msg) =>
log.info("working on " + msg)
Thread.sleep(5000)
}
}
class FastProducerActor extends Actor with Constant with ActorLogging {
log.info("Created FastProducerActor")
val slowConsumerActor = context.actorOf(FromConfig.props(Props[SlowConsumerActor]), "slowConsumerRouter")
def doWork = {
for (i <- 1 until 10) {
val msg = "work" + i
log.info("Sending " + msg)
slowConsumerActor ! Work(msg)
Thread.sleep(1000)
}
}
override def receive: Receive = {
case Start =>
log.info("Got start message in FastProducerActor")
doWork
log.info("Sent all messages to Slow Consumer")
}
}
object BlockingBoundedMailbox extends App {
val sys = ActorSystem("blocking-bounded-mailbox-example")
val fastProducerActor = sys.actorOf(Props[FastProducerActor], "producer")
fastProducerActor ! Start
}
application.conf 是
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-push-timeout-time = 1
mailbox-capacity = 2
}
akka {
loglevel = "INFO"
stdout-loglevel = "INFO"
actor {
deployment {
/producer/slowConsumerRouter {
router = round-robin-pool
nr-of-instances = 1
}
}
mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
default-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
}
}
}
示例结果为
[INFO] [09/01/2019 14:44:58.497] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Created FastProducerActor
[WARN] [09/01/2019 14:44:58.504] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [mailboxes] Configured potentially-blocking mailbox [bounded-mailbox] configured with non-zero pushTimeOut (1000000 nanoseconds), which can lead to blocking behavior when sending messages to this mailbox. Avoid this by setting `bounded-mailbox.mailbox-push-timeout-time` to `0`.
[INFO] [09/01/2019 14:44:58.511] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-21] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] created SlowConsumerActor
[INFO] [09/01/2019 14:44:58.513] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Got start message in FastProducerActor
[INFO] [09/01/2019 14:44:58.515] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work1
[INFO] [09/01/2019 14:44:58.518] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work1
[INFO] [09/01/2019 14:44:59.520] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work2
[INFO] [09/01/2019 14:45:00.524] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work3
[INFO] [09/01/2019 14:45:01.529] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work4
[INFO] [09/01/2019 14:45:01.534] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-6] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:02.536] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work5
[INFO] [09/01/2019 14:45:02.538] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-10] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:03.520] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work2
[INFO] [09/01/2019 14:45:03.541] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work6
[INFO] [09/01/2019 14:45:04.546] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work7
[INFO] [09/01/2019 14:45:04.548] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-16] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:05.552] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work8
[INFO] [09/01/2019 14:45:05.553] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-18] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [4] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:06.556] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sending work9
[INFO] [09/01/2019 14:45:06.558] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-21] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] Message [com.example.Work] from Actor[akka://blocking-bounded-mailbox-example/user/producer#-344546494] to Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860] was not delivered. [5] dead letters encountered. If this is not an expected behavior, then [Actor[akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a#1170076860]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/01/2019 14:45:07.561] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-19] [akka://blocking-bounded-mailbox-example/user/producer] Sent all messages to Slow Consumer
[INFO] [09/01/2019 14:45:08.524] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work3
[INFO] [09/01/2019 14:45:13.527] [blocking-bounded-mailbox-example-akka.actor.default-dispatcher-2] [akka://blocking-bounded-mailbox-example/user/producer/slowConsumerRouter/$a] working on work6
关键的事情——
- 如果绑定(bind)邮箱已满,我确实希望我的 Producer 放慢速度或阻止
- 我不希望我的消息进入死信队列
顺便说一句,一旦我让程序正常工作,我会将邮箱容量更改为更大的值,例如 10000,并将实例数更改为更大的数字,或者使用 reSizer 动态添加更多消费者。
在这个长周末的任何帮助或建议将不胜感激。
最佳答案
找到了答案——而且非常简单。我将邮箱推送超时时间设置得太低。因此,超时后,消息将被发送到死信邮箱。我将邮箱推送超时时间更改为一个更大的数字,这有效地使单线程生产者等待,这正是我想要的。
关于scala - 需要帮助实现阻止有界邮箱,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57749760/