scala - Akka Reactive Streams 总是落后一条消息

标签 scala akka akka-stream reactive-streams

出于某种原因,我的 Akka 流总是在“发出”(?)第一个消息之前等待第二个消息。

这里有一些示例代码可以说明我的问题。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

产量输出:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

我想要的:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...

最佳答案

您的代码现在设置的方式是,在允许开始向下游发出元素之前,您正在完全转换 Source。通过删除表示源的数字范围内的 toStream,您可以清楚地看到该行为(如@slouc 所述)。如果这样做,您将看到 Source 在开始响应下游需求之前首先被完全转换。如果您真的想将 Source 运行到 Sink 中并在中间有一个转换步骤,那么您可以尝试这样构造:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

如果您进行了该更改,那么您将获得所需的效果,即在开始处理下一个元素之前,流向下游的元素在流中一直得到处理。

关于scala - Akka Reactive Streams 总是落后一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38294042/

相关文章:

scala - 让 Scala 解释器在解释调用之间忘记

scala - pyspark簇yarn中的spark-sftp jar出现错误

scala - 从 GraphStage (Akka 2.4.2) 内部关闭 Akka 流

scala - 使用 Akka Http 进行多个绑定(bind)

Scala 初学者在多个文件上编译和运行 scala 程序

scala - 如何防止Intellij Scala编译器将未使用的导入标记为错误?

scala - akka流读取无尽的http流在读取时背压

scala - 如何使用匹配类型列表中的模式匹配结果?

json - 使用 AKKA Stream 解码分块 JSON

scala - 如何真实测试akka流?