java - 我如何知道 ExecutorService 何时完成 ES 上的项目是否可以重新提交到 ES

标签 java executorservice java.util.concurrent executor

我的 Java 应用程序适用于文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 不匹配。

例如,如果我们有 8 个 CPU 的计算机,则理论上可以同时处理 8 个文件夹,如果我们有一台 16 CPU 的计算机,则可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将池大小设置为 3,以便在某个文件夹阻塞 I/O 时允许 CPU 继续执行某些操作。

但是,我们实际上并没有只有一个 ExecutorService,我们有多个 ExecutorService,因为每个文件夹都可以经历多个阶段。

进程1(使用ExecutorService1)→进程2(ExecutorService2)→进程3(ExecutorService3)

进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行器,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。 我们实际上有 12 个进程,但任何特定文件夹都不可能经历所有 12 个进程

但我意识到这是有缺陷的,因为在 16-CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程在运行,这只会导致太多的争用。

所以我要做的就是让所有进程(Process1、Process2…)使用相同的 ExecutorService,这样我们就只能使用与 CPU 匹配的工作线程。

但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一项任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,然后关闭,这才会完成。在将所有内容发送到 Process1 之前,Process0 上的 () 不会成功,依此类推。

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

但是,如果所有内容都在同一个 ES 上并且 Process1 正在提交给 Process2,则这将不再起作用,因为在调用 shutdown() 时,并非 Process1 会提交给 Process2 的所有文件夹,因此它会提前关闭。

那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,如何使用单个 ExecutorService 检测所有工作何时完成?

或者有更好的方法吗?

注意,你可能会想为什么他不将 Process1,2 & 3 的逻辑合并到一个 Process 中。困难在于,虽然我最初按文件夹对歌曲进行分组,但有时歌曲会被分成较小的组,并且它们会被分配到行内的单独进程中,并且不一定是相同的进程,实际上总共有 12 个进程。

基于Sholms想法的尝试

主线程

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

使用 MainAnalyserService 中的此函数提交新任务的流程代码

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

看起来可以正常工作,但失败了

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

现在我意识到,我无法让一个线程调用 future.get() (等待完成),同时其他线程正在添加到列表中。

最佳答案

我同意 Shloim 的观点,即您不需要多个 ExecutorService 实例,只需一个实例(大小取决于您可用的 CPU 数量)就足够了,而且实际上是最佳的。实际上,我认为你可能不需要 ExecutorService;如果您使用信号完整性的外部机制,一个简单的Executor就可以完成这项工作。

我将首先构建一个类来表示整个更大的工作项。如果您需要使用每个子工作项的结果,则可以使用队列,但如果您只想知道是否还有工作要做,则只需要一个计数器。

例如,您可以执行以下操作:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

如果您担心 pendingItems 计数器的同步开销,您可以使用 AtomicInteger 来代替。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch。这是一个示例实现:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

你可以这样调用它:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems 计数器来跟踪剩余工作量要做的事。

关于java - 我如何知道 ExecutorService 何时完成 ES 上的项目是否可以重新提交到 ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56617083/

相关文章:

java - 执行器框架异常行为

java - ExecutorService 固定池卡在单个任务上

Java ThreadPool 概念,以及控制实际线程数的问题

java - 如何获取JRadioButton的ButtonGroup

java - 通用类型的访问者被调用错误的类型

java - Hibernate @事务 session

java - 解析日期重置时区

java - 为什么 ThreadPoolExecutor finalize 调用 shutdown 而不是 shutdownNow

java - 如何删除要在 ConcurrentSkipListMap 中键入的元素?

java - Exchanger 与 CountDownLatch