我有一个大型数组,需要对该数组的每个元素执行大量 CPU 工作。
基于my similar question ,Alexei Kaigodov 先生建议最好的方法是将计算的每个数据 block 拆分到每个单独的线程上。
这是我使用 Kotlin 协程实现的算法:
suspend fun predictAll(movingVehicles: List<MovingVehicle>): List<MovingVehicle?> {
val prevTime = Timestamp(Date().time)
val nextTime = Timestamp(Date().time)
val ctx = Dispatchers.Default
val processors = Runtime.getRuntime().availableProcessors()
val chunks = movingVehicles.chunked(movingVehicles.count() / processors)
val s = coroutineScope {
val res = mutableListOf<Deferred<List<MovingVehicle?>>>()
for (c in chunks) {
val r = async(ctx) {
c.map { predictLocation(it, prevTime, nextTime) }
}
res.add(r)
}
res.awaitAll()
}
return s.flatten()
}
private fun predictLocation(
mv: MovingVehicle,
prevTime: Timestamp,
nextTime: Timestamp,
relevance: Int = 5
): MovingVehicle?
它确实有效,但也许有更好的方法吗? 我正在寻找 ExecutorService,但看起来它比协程需要更多的样板代码。
最佳答案
这实际上是一种使用协程的 Kotilinic 方式。您提交可以并发执行的异步任务,然后等待它们完成。
值得深思的一个问题。一切都在线程中执行。这意味着协程也在线程上执行,如果您的任务正在阻塞,线程将被阻塞。协程不会在那里保存。因此,创建一个具有最适合应用程序的属性(背压机制、最小/最大线程数等)的线程池
通常是一个好主意
现在,在您的情况下,您有 CPU 密集型任务,您无法通过拥有大量线程来实现更高的性能。对于此类任务,实际应用Amdahl's_law给出 -
#threads = #cpu-cores - 1
协程默认由 common pool 支持这与上面提到的线程数相同,因此保留默认设置似乎很好。
但是,多个库可能正在使用此池,如果其中任何一个库中有 IO 阻塞任务,则会降低性能。我建议创建您自己的 ForkJoinPool 并将其用作调度程序
val nOfThreads = Runtime.getRuntime().availableProcessors() - 1;
val ctx = ForkJoinPool( if (nOfThreads == 0) then 1 else nOfThreads).asCoroutineDispatcher()
关于java - 处理大数组的多线程计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56700865/