我收到一个包含 N 个条目的大文件。对于每个条目,我都会创建一个新线程。我需要等待所有 N 个线程都终止。
一开始我使用的是 Phaser,但它的实现仅限于 65K 方。所以,爆炸了,因为 N 可能是 100K。
然后,我尝试了 CountDownLatch。这很好用,非常简单的概念和非常简单的实现。但我不知道N个数。
Phaser 是我的解决方案,但它有这个限制。
有什么想法吗?
这篇文章相关: Flexible CountDownLatch?
最佳答案
听起来您要解决的问题是尽快处理大量任务并等待处理完成。
同时处理大量任务的问题是,它可能会导致过多的上下文切换,并且会严重损坏您的计算机,并使处理速度减慢到一定数量(取决于硬件)的并发线程数以上。这意味着您需要对正在执行的并发工作线程有一个上限。
Phaser 和 CountDownLatch 都是同步原语,它们的目的是提供对关键代码块的访问控制,而不是管理并行执行。
我会使用 Executor service在这种情况下。它支持添加任务(多种形式,包括 Runnable )。
您可以使用 Executors 轻松创建 ExecutorService
类(class)。我建议使用fixed size thread pool为此,最多使用 20-100 个线程 - 取决于任务的 CPU 密集程度。任务所需的计算能力越多,在不严重降低性能的情况下可以处理的并行线程数量就越少。
有多种方法可以等待所有任务完成:
- 收集
submit
方法返回的所有Future
实例,然后简单地调用 get在他们所有人身上。这可确保每个任务在循环完成时执行。 - Shut down执行者服务和 wait for all the submitted tasks to finish 。此方法的缺点是您必须指定等待任务完成的最长时间。另外,它不太优雅,您并不总是想关闭执行器,这取决于您是否正在编写单次应用程序或之后继续运行的服务器 - 如果是服务器应用程序,您肯定必须采用以前的方法。
最后,这是一个说明这一切的代码片段:
List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);
for(TaskFromFile task : tasks) {
// createRunnable is not necessary in case your task implements Runnable
executor.submit(createRunnable(task));
}
// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
关于java - 由于限制,灵活的 CountDownLatch 无法使用 Phaser,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33400558/