java - 处理大数组的多线程计算

标签 java multithreading kotlin kotlin-coroutines

我有一个大型数组,需要对该数组的每个元素执行大量 CPU 工作。

基于my similar question ,Alexei Kaigodov 先生建议最好的方法是将计算的每个数据 block 拆分到每个单独的线程上。

这是我使用 Kotlin 协程实现的算法:

suspend fun predictAll(movingVehicles: List<MovingVehicle>): List<MovingVehicle?> {
    val prevTime = Timestamp(Date().time)
    val nextTime = Timestamp(Date().time)
    val ctx = Dispatchers.Default
    val processors = Runtime.getRuntime().availableProcessors()
    val chunks = movingVehicles.chunked(movingVehicles.count() / processors)
    val s = coroutineScope {
        val res = mutableListOf<Deferred<List<MovingVehicle?>>>()
        for (c in chunks) {
            val r = async(ctx) {
                c.map { predictLocation(it, prevTime, nextTime) }
            }
            res.add(r)
        }
        res.awaitAll()
    }
    return s.flatten()
}

private fun predictLocation(
    mv: MovingVehicle,
    prevTime: Timestamp,
    nextTime: Timestamp,
    relevance: Int = 5
): MovingVehicle?

它确实有效,但也许有更好的方法吗? 我正在寻找 ExecutorService,但看起来它比协程需要更多的样板代码。

最佳答案

这实际上是一种使用协程的 Kotilinic 方式。您提交可以并发执行的异步任务,然后等待它们完成。

值得深思的一个问题。一切都在线程中执行。这意味着协程也在线程上执行,如果您的任务正在阻塞,线程将被阻塞。协程不会在那里保存。因此,创建一个具有最适合应用程序的属性(背压机制、最小/最大线程数等)的线程池通常是一个好主意

现在,在您的情况下,您有 CPU 密集型任务,您无法通过拥有大量线程来实现更高的性能。对于此类任务,实际应用Amdahl's_law给出 -

#threads = #cpu-cores - 1

协程默认由 common pool 支持这与上面提到的线程数相同,因此保留默认设置似乎很好。

但是,多个库可能正在使用此池,如果其中任何一个库中有 IO 阻塞任务,则会降低性能。我建议创建您自己的 ForkJoinPool 并将其用作调度程序

val nOfThreads = Runtime.getRuntime().availableProcessors() - 1;
val ctx = ForkJoinPool( if (nOfThreads == 0) then 1 else nOfThreads).asCoroutineDispatcher()

关于java - 处理大数组的多线程计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56700865/

相关文章:

c# - Interlocked.Read/Exchange for longs on 64-bit architectures

java - 如何在@RequestBody 中自定义将字符串转换为枚举?

gradle - 使用 Gradle Kotlin DSL 构建源 jar?

collections - 如何在Kotlin中使用唯一的增量键对值进行分组?

java - 在一个 catch block 中处理所有 Java 异常

java - Tomcat 会自动编译 java servlet 吗?

java - 如何在 Java 中存储设置凭据

java - Java 中的泛型和排序

java - 使用超时和信号量进行线程阻塞

c# - 等待所有线程完成,超时