scala - 如何创建不受背压影响的 Source

标签 scala akka akka-stream backpressure

我想测试一些 Akka 流功能,例如 conflate .为此,我需要在简单的单元测试中构建一个不受背压影响的源。天真的尝试,如

Source.tick(1.milli, 1.milli, "tick").map(_ => Random.nextDouble())

由于背压而不工作。 OTOH 通过 HTTP 可能是矫枉过正。

我将如何创建 简单 Source对于不受背压影响的单元测试?

最佳答案

您可以使用 Source.actorRef这是 - 按设计 - 不支持背压。请参阅下面的示例:

  val actorRef: ActorRef = Source.actorRef(0, OverflowStrategy.dropNew)
      .map(_ => Random.nextDouble())
      .to(yourSink).run()

  system.scheduler.schedule(1.milli, 1.milli, actorRef, "tick")(system.dispatcher)

这里的 bufferSize 参数和溢出策略是随机选择的,您需要根据测试的需要调整它们。

更多信息 Source.actorRef可以在 docs 中找到.

关于scala - 如何创建不受背压影响的 Source,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43767664/

相关文章:

scala 类型问题 : SoftReference, ReferenceQueues, SoftHashMap

scala - 如何在 Playframework 中使用 Oracle 存储过程和 Scala Anorm

scala 的特征和来自《Scala 编程》第二版的 super 示例

scala - 如何透明地将输入元素与输出元素相关联

scala - `BoundedSourceQueue` from `Source.queue` 对于并发生产者来说可以吗?

scala - dispatch.Http.shutdown() 放在哪里

java - 整个集群上的 Akka ActorSelection

scala - 在 Scala 中使用 Akka Http 回显简单的 HTTP 服务器

logging - Akka 自定义记录器收不到一些消息

scala - Akka Stream,来自功能的来源?