scala - Akka 流 : how to wait until several Flows are completed

标签 scala akka akka-stream

我有好几个 Flow s 在我的程序中,我想并行处理。全部完成后,我想触发一些 Action 。

一种方法是在每次完成后向 Actor 发送一条消息,当 Actor 验证所有流都准备就绪时,它可以触发操作。

我想知道 akka-streams Scala DSL 中是否有我可能忽略的任何内容,这会使它变得更加简单。

编辑 :
将 Flow 转换为 Future 是行不通的,因为正如文档中提到的,Future 在流中发生的第一个事件之后立即完成。运行以下代码:

implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())

def main(args: Array[String]): Unit = {
  val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)

  fut.onComplete{ _ =>
    println("future completed")
  }
}

打印“滴答”,然后是“ future 完成”,然后是无限序列的“滴答”。

最佳答案

正如评论中提到的,我相信@Eduardo 关于转换 Flow 是正确的。到 Future .考虑这个例子:

implicit val system = ActorSystem("Sys")
import system.dispatcher

val text1 = 
  """hello1world
  foobar""".stripMargin

val text2 = 
  """this1is
  a1test""".stripMargin

def flowFut(text:String) = Flow(text.split("\\s").toVector)
  .map(_.toUpperCase())
  .map(_.replace("1", ""))
  .toFuture(FlowMaterializer(MaterializerSettings()))    


val fut1 = flowFut(text1)    
val fut2 = flowFut(text2)    
val fut3 = for{
  f1 <- fut1
  f2 <- fut2
} yield {
  s"$f1, $f2"
}

fut3 foreach {println(_)}

在这里,我在每组文本行上运行两个单独的转换,转换为 upper 并从任何文本中删除 #1。然后我强制执行这个 Flow 的结果到 Future所以我可以将结果组合成一个新的 Future然后我打印出来。

关于scala - Akka 流 : how to wait until several Flows are completed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25331556/

相关文章:

scala - 配置 IntelliJ IDEA 进行 Specs2 测试的后台编译?

scala - Akka Actor "ask"和 "Await"带有 TimeoutException

scala - 你如何在最新的 Akka (2.4.6) 中限制 Flow?

c++ - 从 C++ 应用程序到 Akka actor 的推荐方式

scala - 如何使用 Play 2.5 在分块响应中将 Anorm 大型查询结果流式传输到客户端

scala - 使用相互递归函数时,是否有一种简单的方法来处理流?

scala - SBT 0.13.7 和 Scala 2.11.5 版本兼容性问题

Scala 参数化方法和算术运算

scala - 带有 akka-cluster 的 akka-streams

scala - Akka Stream 连接到多个接收器