Kotlin 协同例程按顺序执行,但仅在生产机器上执行

标签 kotlin concurrency kotlin-coroutines coroutine

我有一堆网络请求想要并行处理。

以下伪代码应该可以很好地了解我现在正在做什么:

runBlocking {
    buildList {
        withContext(tracer.asContextElement()) {
            items.forEach { item ->
                add(
                    async {
                        // a few IO intensive operations (i.e. network requests)
                    }
                )
            }
        }
    }.awaitAll()
}

我已经设置了跟踪工具,并且在本地这似乎可以完成这项工作。然而,在我的生产基础设施中,异步任务按顺序执行,即第二个任务在第一个任务完成后立即开始。

我也尝试过使用 withContext(Dispatchers.IO.plus(tracer.asContextElement())) 但我观察到没有区别。

我唯一能说的是,我的开发机器有多个 CPU 核心,而我的生产机器通常只有 1 个。无论如何,由于这些进程的 IO 重度性质,我怀疑这就是问题所在。我无法真正解释造成这种情况的原因,但我的直觉是我从根本上不了解协程在 Kotlin 中的工作原理。

至于所讨论的网络请求的性质,我正在使用异步执行请求的第三方 SDK,并且似乎在幕后使用 ForkJoinPool.commonPool() 作为执行器.

最佳答案

如果您不在此处切换调度程序,所有这些协程将在同一个线程中运行 - 被 runBlocking 阻止的线程。如果每个协程内部的计算是阻塞的,它们将一一阻塞唯一的线程,而没有任何并行化的方法。这可以解释您所看到的内容(尽管奇怪的是您没有在本地重现)。

I have also tried using withContext(Dispatchers.IO.plus(tracer.asContextElement())) but I observe no difference.

您的修复应该有效,除非您正在执行的 IO 实际上是在管理线程本身,并且无论从何处调用它,也将执行限制在单个线程中。也许你应该研究一下实际的 IO。

编辑:您提到您通过使用通用 ForkJoinPool 的第三方 SDK 执行 IO 操作 - 该操作由单 CPU 计算机上的单个线程支持,因此这解释了原因这些调用在您的单 CPU 生产机器中并未并行化。解决这个问题的唯一选择是:

  1. 检查您使用的 SDK 是否允许自定义线程后备池
  2. 使用 JVM 属性自定义 ForkJoinPool 的大小 java.util.concurrent.ForkJoinPool.common.parallelism
  3. 使用其他 SDK :)

如果您以阻塞方式调用库,您仍然需要自定义调度程序,但如果您使用 Future 将异步任务转换为暂停,则不需要.await() 或类似的。


现在,此代码中还需要注意一些其他事项:

  • 你不需要buildList { .. },你可以只使用map { thing }而不是forEach { add(thing) },您将获得结果列表作为返回值(它也适用于 withContext,因为它返回 lambda 结果)

  • withContext 实际上等待所有子协程完成,因此 awaitAll() 在这里放错了位置(它应该内部 withContext)

  • 实际上,你可能根本不需要 withContext,你可以将自定义上下文直接传递给 runBlocking,除非你在 中有其他东西>runBlocking 您不想在此上下文中运行

  • (可选)如果 IO 计算不返回结果,则根本不需要 awaitAll,只需使用 launch 即可。

假设您确实需要结果,因此忽略最后一点,您当前的代码(带有调度程序修复)可以重写为:

val results = runBlocking(Dispatchers.IO + tracer.asContextElement()) {
    items.map { item ->
        async {
            performIO(item)
        }
    }.awaitAll()
}

否则:

runBlocking(Dispatchers.IO + tracer.asContextElement()) {
    items.map { item ->
        launch {
            performIO(item)
        }
    }
}

关于Kotlin 协同例程按顺序执行,但仅在生产机器上执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74376696/

相关文章:

android - 使用协程对任务进行排队

java - Rebase方法拦截,完全重写方法

android - 是否可以在 Room @DAO 中编写返回 LiveData 的 "suspend"函数?

gradle - 如何使 compileKotlin 依赖于 compileJava in gradle

Android 每 x 分钟运行一次 Kotlin Coroutine

spring-boot - 使用 Kotlin 协程的 Spring Boot Rest 服务

android - 潜在的空指针异常。某些布局版本中缺少资源

c++ - 如何解决用于在 C++ 中维护静态局部变量的服务数据的线程安全问题?

java - AWS Lambda 的并发行为

haskell - 在 Haskell 中测试异步异常下的正确性