akka - 如何限制 Akka Stream 每秒只执行和发送一条消息?

标签 akka rate rate-limiting akka-stream

我有一个 Akka Stream,我希望该流大约每秒向下游发送消息。

我尝试了两种方法来解决这个问题,第一种方法是让流开始时的生产者只在有 Continue 消息进入这个 actor 时每秒发送一次消息。
// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }
这工作了一小会儿,然后大量的 Continue 消息出现在 ActorPublisher Actor 中,我假设(猜测但不确定)来自下游通过背压请求消息,因为下游可以快速消耗但上游没有以快速的速度生产.所以这个方法失败了。

我尝试的另一种方法是通过背压控制,我使用了 MaxInFlightRequestStrategyActorSubscriber在流的末尾将消息数限制为每秒 1 条。这有效,但传入的消息大约一次传入三个左右,而不是一次传入一个。似乎背压控制不会立即改变传入消息的速率,或者消息已经在流中排队等待处理。

所以问题是,我怎样才能拥有一个每秒只能处理一条消息的 Akka Stream?

我发现 MaxInFlightRequestStrategy是一种有效的方法,但我应该将批量大小设置为 1,它的批量大小默认为 5,这导致了我发现的问题。现在我正在查看提交的答案,这也是解决问题的一种过于复杂的方法。

最佳答案

您可以让您的元素通过节流流,这将对快速源产生背压,或者您可以使用 tick 的组合。和 zip .

第一个解决方案是这样的:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val throttlingFlow = Flow[Long].throttle(
  // how many elements do you allow
  elements = 1,
  // in what unit of time
  per = 1.second,
  maximumBurst = 0,
  // you can also set this to Enforcing, but then your
  // stream will collapse if exceeding the number of elements / s
  mode = ThrottleMode.Shaping
)

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))

第二种解决方案是这样的:
val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))

关于akka - 如何限制 Akka Stream 每秒只执行和发送一条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36215227/

相关文章:

javascript - 定时 promise 队列/节流

amazon-web-services - AWS WAF 如何将 IP 路径的速率限制为低于 2000 个请求/分钟

scala - 要使此代码正常工作(一般情况下)需要导入什么?

java - 如何在scala akka(spray)中编写休息服务的测试用例

r - 用 R 中的置信区间计算亚组的年龄标准化率

jquery - 防止重复匿名投票

scala - 在哪里可以找到 akka.cloud 包?

java - 如何将基于 Actor 的源与 Akka Graph 结合使用?

python - 在 Python 2.7 中应用折扣并显示折扣率

php - 如何管理请求受限的 API 调用