java - 转换集合流的内部元素

标签 java scala akka-stream

我最近在空闲时间学习使用 Akka Streams(在 Scala 和 Java 中),并且想知道如何实现以下场景。

我有一个连续的非常大的集合流进入我的管道,我想让管道转换每个集合中的元素。

将 Collection 转换为其元素流很容易,但我还需要将 1 Collection 中的所有转换元素一起收集回 1 个新 Collection(仅包含以前也在原始集合中的转换对象) .所以我必须知道 1 Collection 的特定元素流何时被处理,因为这样我就可以发出转换后的集合,以便在通用管道中进一步处理。

最佳答案

根据评论者的建议,您可以在transformationPipeline 中使用fold 来组装List 类型的元素。要在运行流时维护列表边界,请使用 flatMapConcat 而不是 mapConcat,如以下简单示例所示:

def transform(s: String): Int = s.length

val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
  fold(List.empty[Int])((ls, s) => transform(s) :: ls).
  map(_.reverse)

val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
  flatMapConcat(Source(_).via(transformationPipeline))

Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
  via(flow).
  runForeach(println)
// List(1, 2)
// List(2, 3, 1)

关于java - 转换集合流的内部元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64883523/

相关文章:

scala - 是否可以在 Scala 宏中从 WeakTypeTag 生成 Apply?

scala - 在 Akka-Streams 中与 mapAsync 一起使用的 ExecutionContext

scala - 为什么失败案例从未达到?

java - 调用gradle idea时如何将本地依赖放在第一位?

java - JComboBox.addItem 用于空对象

java - 读取文本文件并添加到列表时出现内存不足错误

Java:比较两个列表

scala - scalaz-stream 中的桶式水槽

scala - 在 Scala 中编写斐波那契函数的最快方法是什么?

scala - 了解 NotUsed 和 Done