我的 Java 应用程序适用于文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 不匹配。
例如,如果我们有 8 个 CPU 的计算机,则理论上可以同时处理 8 个文件夹,如果我们有一台 16 CPU 的计算机,则可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将池大小设置为 3,以便在某个文件夹阻塞 I/O 时允许 CPU 继续执行某些操作。
但是,我们实际上并没有只有一个 ExecutorService,我们有多个 ExecutorService,因为每个文件夹都可以经历多个阶段。
进程1(使用ExecutorService1)→进程2(ExecutorService2)→进程3(ExecutorService3)
进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行器,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。 我们实际上有 12 个进程,但任何特定文件夹都不可能经历所有 12 个进程
但我意识到这是有缺陷的,因为在 16-CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程在运行,这只会导致太多的争用。
所以我要做的就是让所有进程(Process1、Process2…)使用相同的 ExecutorService,这样我们就只能使用与 CPU 匹配的工作线程。
但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一项任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,然后关闭,这才会完成。在将所有内容发送到 Process1 之前,Process0 上的 () 不会成功,依此类推。
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
但是,如果所有内容都在同一个 ES 上并且 Process1 正在提交给 Process2,则这将不再起作用,因为在调用 shutdown() 时,并非 Process1 会提交给 Process2 的所有文件夹,因此它会提前关闭。
那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,如何使用单个 ExecutorService 检测所有工作何时完成?
或者有更好的方法吗?
注意,你可能会想为什么他不将 Process1,2 & 3 的逻辑合并到一个 Process 中。困难在于,虽然我最初按文件夹对歌曲进行分组,但有时歌曲会被分成较小的组,并且它们会被分配到行内的单独进程中,并且不一定是相同的进程,实际上总共有 12 个进程。
基于Sholms想法的尝试
主线程
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
使用 MainAnalyserService 中的此函数提交新任务的流程代码
public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
看起来可以正常工作,但失败了
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
现在我意识到,我无法让一个线程调用 future.get() (等待完成),同时其他线程正在添加到列表中。
最佳答案
我同意 Shloim 的观点,即您不需要多个 ExecutorService
实例,只需一个实例(大小取决于您可用的 CPU 数量)就足够了,而且实际上是最佳的。实际上,我认为你可能不需要 ExecutorService;如果您使用信号完整性的外部机制,一个简单的Executor
就可以完成这项工作。
我将首先构建一个类来表示整个更大的工作项。如果您需要使用每个子工作项的结果,则可以使用队列,但如果您只想知道是否还有工作要做,则只需要一个计数器。
例如,您可以执行以下操作:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private int pendingItems; // guarded by monitor lock on this instance
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}
public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}
public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}
public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}
public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;
public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}
@Override
public void run() {
try {
// do some work with the file
if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}
如果您担心 pendingItems
计数器的同步开销,您可以使用 AtomicInteger
来代替。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch
。这是一个示例实现:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}
public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}
public boolean hasPendingWork() {
return pendingItems.get() > 0;
}
public void awaitCompletion() {
latch.await();
}
}
你可以这样调用它:
Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();
此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems
计数器来跟踪剩余工作量要做的事。
关于java - 我如何知道 ExecutorService 何时完成 ES 上的项目是否可以重新提交到 ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56617083/