出于某种原因,我的 Akka 流总是在“发出”(?)第一个消息之前等待第二个消息。
这里有一些示例代码可以说明我的问题。
val rx = Source((1 to 100).toStream.map { t =>
Thread.sleep(1000)
println(s"doing $t")
t
})
rx.runForeach(println)
产量输出:
doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...
我想要的:
doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...
最佳答案
您的代码现在设置的方式是,在允许开始向下游发出元素之前,您正在完全转换 Source
。通过删除表示源的数字范围内的 toStream
,您可以清楚地看到该行为(如@slouc 所述)。如果这样做,您将看到 Source
在开始响应下游需求之前首先被完全转换。如果您真的想将 Source
运行到 Sink
中并在中间有一个转换步骤,那么您可以尝试这样构造:
val transform =
Flow[Int].map{ t =>
Thread.sleep(1000)
println(s"doing $t")
t
}
Source((1 to 100).toStream).
via(transform ).
to(Sink.foreach(println)).
run
如果您进行了该更改,那么您将获得所需的效果,即在开始处理下一个元素之前,流向下游的元素在流中一直得到处理。
关于scala - Akka Reactive Streams 总是落后一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38294042/