我有一个使用 ExecutionContext (EC) 和 akka (ActorSystem) 构建的代码。这段代码做了一些非常奇特的事情:它使用带有 parallelism-max = 1
的 AkkaForkJoinPool 并执行如下操作:
implicit ec = // akka EC backed by AkkaForkJoinPool with parallelism=1
Future{ // (1)
// (2) get data from DB which uses a separate ExecutionContext for IO
val data: Future[Data] = getData()
// (3) use the data
data.map{ whatEver }
// etc ...
}
[编辑:我知道,这样说,拥有顶级 Future (1) 很奇怪。但实际上,代码不是我自己的,它跨越了多个函数,并使用了更复杂的操作,例如几个包装的 for 理解。所以我不会改变]
现在我移动了这段代码,并按照相同的规则用我自己的代码替换了 akka 提供的隐式 ExecutionContext (EC):我使用并行度 = 1 的 (java) ForkJoinPool。
因此,这段代码会卡在 map (3) 处。我的理解是,当调用映射(3)时,它需要一个线程,但 EC 无法提供线程,因为它唯一可用的线程被 Future(1)占用。
我不清楚 ForkJoinPool 是如何工作的。所以我的问题是我理解是否正确,并且:
- 如果没有,我就错误地使用了 java ForkJoinPool。 IE。有办法让它工作吗?
- 如果是,akka 是如何管理的?
我正在使用 akka 2.3.15、scala 2.11.12 和 java 8
最佳答案
不要将所有内容包装在 future 中,而是对第一个 future 的结果使用 for 理解,因为一切都依赖于它。
for {
data <- getData()
} yield data.map( whatEver )
或
getData().map { data =>
data.map { whatEver }
}
关于java - 使用单线程 ForkJoinPool 在 Future 内运行 Future.map,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52318653/