我有一堆网络请求想要并行处理。
以下伪代码应该可以很好地了解我现在正在做什么:
runBlocking {
buildList {
withContext(tracer.asContextElement()) {
items.forEach { item ->
add(
async {
// a few IO intensive operations (i.e. network requests)
}
)
}
}
}.awaitAll()
}
我已经设置了跟踪工具,并且在本地这似乎可以完成这项工作。然而,在我的生产基础设施中,异步任务按顺序执行,即第二个任务在第一个任务完成后立即开始。
我也尝试过使用 withContext(Dispatchers.IO.plus(tracer.asContextElement()))
但我观察到没有区别。
我唯一能说的是,我的开发机器有多个 CPU 核心,而我的生产机器通常只有 1 个。无论如何,由于这些进程的 IO 重度性质,我怀疑这就是问题所在。我无法真正解释造成这种情况的原因,但我的直觉是我从根本上不了解协程在 Kotlin 中的工作原理。
至于所讨论的网络请求的性质,我正在使用异步执行请求的第三方 SDK,并且似乎在幕后使用 ForkJoinPool.commonPool()
作为执行器.
最佳答案
如果您不在此处切换调度程序,所有这些协程将在同一个线程中运行 - 被 runBlocking
阻止的线程。如果每个协程内部的计算是阻塞的,它们将一一阻塞唯一的线程,而没有任何并行化的方法。这可以解释您所看到的内容(尽管奇怪的是您没有在本地重现)。
I have also tried using
withContext(Dispatchers.IO.plus(tracer.asContextElement()))
but I observe no difference.
您的修复应该有效,除非您正在执行的 IO 实际上是在管理线程本身,并且无论从何处调用它,也将执行限制在单个线程中。也许你应该研究一下实际的 IO。
编辑:您提到您通过使用通用 ForkJoinPool
的第三方 SDK 执行 IO 操作 - 该操作由单 CPU 计算机上的单个线程支持,因此这解释了原因这些调用在您的单 CPU 生产机器中并未并行化。解决这个问题的唯一选择是:
- 检查您使用的 SDK 是否允许自定义线程后备池
- 使用 JVM 属性自定义 ForkJoinPool 的大小
java.util.concurrent.ForkJoinPool.common.parallelism
- 使用其他 SDK :)
如果您以阻塞方式调用库,您仍然需要自定义调度程序,但如果您使用 Future 将异步任务转换为暂停,则不需要.await()
或类似的。
现在,此代码中还需要注意一些其他事项:
你不需要
buildList { .. }
,你可以只使用map { thing }
而不是forEach { add(thing) }
,您将获得结果列表作为返回值(它也适用于withContext
,因为它返回 lambda 结果)withContext
实际上等待所有子协程完成,因此awaitAll()
在这里放错了位置(它应该内部withContext
)实际上,你可能根本不需要
withContext
,你可以将自定义上下文直接传递给runBlocking
,除非你在中有其他东西>runBlocking
您不想在此上下文中运行(可选)如果 IO 计算不返回结果,则根本不需要
awaitAll
,只需使用launch
即可。
假设您确实需要结果,因此忽略最后一点,您当前的代码(带有调度程序修复)可以重写为:
val results = runBlocking(Dispatchers.IO + tracer.asContextElement()) {
items.map { item ->
async {
performIO(item)
}
}.awaitAll()
}
否则:
runBlocking(Dispatchers.IO + tracer.asContextElement()) {
items.map { item ->
launch {
performIO(item)
}
}
}
关于Kotlin 协同例程按顺序执行,但仅在生产机器上执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74376696/