我最近在空闲时间学习使用 Akka Streams(在 Scala 和 Java 中),并且想知道如何实现以下场景。
我有一个连续的非常大的集合流进入我的管道,我想让管道转换每个集合中的元素。
将 Collection 转换为其元素流很容易,但我还需要将 1 Collection 中的所有转换元素一起收集回 1 个新 Collection(仅包含以前也在原始集合中的转换对象) .所以我必须知道 1 Collection 的特定元素流何时被处理,因为这样我就可以发出转换后的集合,以便在通用管道中进一步处理。
最佳答案
根据评论者的建议,您可以在transformationPipeline
中使用fold
来组装List 类型的元素。要在运行流时维护列表边界,请使用 flatMapConcat
而不是 mapConcat
,如以下简单示例所示:
def transform(s: String): Int = s.length
val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
fold(List.empty[Int])((ls, s) => transform(s) :: ls).
map(_.reverse)
val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
flatMapConcat(Source(_).via(transformationPipeline))
Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
via(flow).
runForeach(println)
// List(1, 2)
// List(2, 3, 1)
关于java - 转换集合流的内部元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64883523/