scala - 使用 Akka 进行 fork 和 join

标签 scala parallel-processing future akka actor

问题陈述:我有一个证券投资组合需要以并行方式处理。在Java中,我使用线程池来处理每个安全性,并使用锁存器来倒计时。完成后我会进行一些合并等。

因此,我向我的 SecurityProcessor(它是一个参与者)发送消息,并等待所有 future 完成。最后我使用 MergeHelper 进行后处理。 SecurityProcessor 接受一个安全性,执行一些 I/O 和处理并回复一个安全性

  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get

这个设计正确吗?有没有更好/更酷的方法来使用 akka 来实现这个常见问题?

最佳答案

val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }

关于scala - 使用 Akka 进行 fork 和 join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8039358/

相关文章:

java - 顺序流与并行流 - 由中间和终端操作引入的顺序更改

c++ - 如何在列表中存储自删除 future

Flutter FutureBuilder 有数据和连接状态

java - Scala 对象扩展了 Java 类静态字段

python - 调试并行 Python 程序 (mpi4py)

swing - GridPanel 是如何确定大小的?

c++ - 并行 for 循环体的最佳大小

Java 线程 - 奇怪的 Thread.interrupted() 和 future.cancel(true) 行为

scala - 如何将案例类序列化为制表符分隔的文本文件?

scala - 为什么这个折叠不正确?