问题陈述:我有一个证券投资组合需要以并行方式处理。在Java中,我使用线程池来处理每个安全性,并使用锁存器来倒计时。完成后我会进行一些合并等。
因此,我向我的 SecurityProcessor(它是一个参与者)发送消息,并等待所有 future 完成。最后我使用 MergeHelper 进行后处理。 SecurityProcessor 接受一个安全性,执行一些 I/O 和处理并回复一个安全性
val listOfFutures = new ListBuffer[Future[Security]]()
var portfolioResponse: Portfolio = _
for (security <- portfolio.getSecurities.toList) {
val securityProcessor = actorOf[SecurityProcessor].start()
listOfFutures += (securityProcessor ? security) map {
_.asInstanceOf[Security]
}
}
val futures = Future.sequence(listOfFutures.toList)
futures.map {
listOfSecurities =>
portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
}.get
这个设计正确吗?有没有更好/更酷的方法来使用 akka 来实现这个常见问题?
最佳答案
val futureResult = Future.sequence(
portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
) map { securities => MergeHelper.merge(portfolio, securities) }
关于scala - 使用 Akka 进行 fork 和 join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8039358/