java - 线程池,worker既是生产者又是消费者

标签 java multithreading concurrency producer-consumer java.util.concurrent

我有一个可以异步处理的无限作业队列。每个作业的处理可能会也可能不会触发为此队列创建新作业。

我想要一个由多个工作线程组成的池,从该队列中获取项目并并行处理它们,直到队列为空并且所有工作线程都空闲等待队列中的新作业(因为忙碌的工作人员最终可能会向队列中添加新作业)。

有使用java.util.concurrent的秘诀吗?我可以用它来解决这个特定问题的实现,其中 worker 也是生产者?目前尚不清楚 API 是否以直接方式支持这种场景。

特别是,我希望能够检测终止条件,即当没有更多作业可用(空作业队列)并且不会再生成作业(所有空闲工作线程)时。

编辑

Nam San 下面的答案似乎是最优雅的方法,基本上可以归结为跟踪已提交作业的数量与已完成作业的数量,并使用这些数字相等的情况作为终止条件。

我已经使用 java.util.concurrent 实现了一个完整的示例扩展 ThreadPoolExecutor 的实现为了实现这一点,加上专门的作业队列来接受 Comparable以特定方式排序的实例。

  • TestExecutor.java :扩展 ThreadPoolExecutor 的自定义执行器但还有其他方法来执行可能会创建新作业的作业,以及一个新的等待方法,该方法会等待所有提交的作业完成。
  • WorkUnit.java :一个可比较的、可运行的作业的示例,它可能会创建新的作业并提交给 TestExecutor .
  • Test.java :包含使用 WorkUnit 运行示例的主要方法具有 TestExecutor 的实例.

最佳答案

下面的代码演示了如何使用 Executor 周围的包装类来计算已提交作业的数量,并将其与已完成作业的数量进行比较,以实现您想要的效果。请注意,您的任务必须调用包装类的 execute 方法,并且切勿直接调用底层的 Executor。如果需要的话,扩展下面的包装器来包装 ExecutorService 的“提交”方法应该很简单。

public class ExampleExecutor {

    private final Executor executor;
    private long submitCount = 0;
    private long doneCount = 0;

    public ExampleExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(Collection<Runnable> commands) {
        for (Runnable command : commands) {
            execute(command);
        }
    }

    public synchronized void execute(final Runnable command) {
        submitCount ++;

        executor.execute(new Runnable() {
            public void run() {
                try {
                    command.run();
                } finally {
                    synchronized (ExampleExecutor.this) {
                        doneCount++;
                        if (doneCount == submitCount) {
                            ExampleExecutor.this.notifyAll();
                        }
                    }
                }
            }
        });
    }

    public synchronized void awaitCompletion() throws InterruptedException {
        while (doneCount != submitCount) {
            this.wait();
        }
    }
}

编辑:添加了下面的测试用例来演示如何使用上述代码

public class Test {

    static class Task implements Runnable {
        private final String id;
        private final long repetitions;
        private final long respawnSize;
        private final ExampleExecutor executor;

        public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
            this.id = id;
            this.repetitions = repetitions;
            this.respawnSize = respawnSize;
            this.executor = executor;
        }

        public void run() {
            for (int i = 0; i < respawnSize; i ++) {
                // Spawning new sub tasks
                executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
            }

            double sum = 0;
            for (int i = 0; i < repetitions; i++) {
                sum += Math.sin(i);
            }

            System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
        }
    }

    public static void main(String argv[]) throws InterruptedException {
        ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
        executor.execute(new Task("0", 2000000, 100, executor));

        System.err.println("main thread awaits completion");
        executor.awaitCompletion();
        System.err.println("main thread recieved completion event");
    }
}

关于java - 线程池,worker既是生产者又是消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13005964/

相关文章:

multithreading - 新的Delphi TActivityIndi​​cator运行在主线程有什么用?

java - 哪种是杀死线程的最简洁和/或最有效的方法

java - 干扰器 : journaling Example

java - 如何修复javac包不存在?

java - 数的前 n 个因数

java - ORMLite 混合 orderBy() 和 orderByRaw()

c++ - VC++ 互斥问题

java - 知道为什么我无法为我的 SavingsAccount 分配唯一的号码吗?

c# - 在关闭主窗体之前关闭侧线程

Go并发、goroutine同步和关闭 channel