我有一个可以异步处理的无限作业队列。每个作业的处理可能会也可能不会触发为此队列创建新作业。
我想要一个由多个工作线程组成的池,从该队列中获取项目并并行处理它们,直到队列为空并且所有工作线程都空闲等待队列中的新作业(因为忙碌的工作人员最终可能会向队列中添加新作业)。
有使用java.util.concurrent
的秘诀吗?我可以用它来解决这个特定问题的实现,其中 worker 也是生产者?目前尚不清楚 API 是否以直接方式支持这种场景。
特别是,我希望能够检测终止条件,即当没有更多作业可用(空作业队列)并且不会再生成作业(所有空闲工作线程)时。
编辑
Nam San 下面的答案似乎是最优雅的方法,基本上可以归结为跟踪已提交作业的数量与已完成作业的数量,并使用这些数字相等的情况作为终止条件。
我已经使用 java.util.concurrent
实现了一个完整的示例扩展 ThreadPoolExecutor
的实现为了实现这一点,加上专门的作业队列来接受 Comparable
以特定方式排序的实例。
- TestExecutor.java :扩展
ThreadPoolExecutor
的自定义执行器但还有其他方法来执行可能会创建新作业的作业,以及一个新的等待方法,该方法会等待所有提交的作业完成。 - WorkUnit.java :一个可比较的、可运行的作业的示例,它可能会创建新的作业并提交给
TestExecutor
. - Test.java :包含使用
WorkUnit
运行示例的主要方法具有TestExecutor
的实例.
最佳答案
下面的代码演示了如何使用 Executor
周围的包装类来计算已提交作业的数量,并将其与已完成作业的数量进行比较,以实现您想要的效果。请注意,您的任务必须调用包装类的 execute
方法,并且切勿直接调用底层的 Executor
。如果需要的话,扩展下面的包装器来包装 ExecutorService
的“提交”方法应该很简单。
public class ExampleExecutor {
private final Executor executor;
private long submitCount = 0;
private long doneCount = 0;
public ExampleExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(Collection<Runnable> commands) {
for (Runnable command : commands) {
execute(command);
}
}
public synchronized void execute(final Runnable command) {
submitCount ++;
executor.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
synchronized (ExampleExecutor.this) {
doneCount++;
if (doneCount == submitCount) {
ExampleExecutor.this.notifyAll();
}
}
}
}
});
}
public synchronized void awaitCompletion() throws InterruptedException {
while (doneCount != submitCount) {
this.wait();
}
}
}
编辑:添加了下面的测试用例来演示如何使用上述代码
public class Test {
static class Task implements Runnable {
private final String id;
private final long repetitions;
private final long respawnSize;
private final ExampleExecutor executor;
public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
this.id = id;
this.repetitions = repetitions;
this.respawnSize = respawnSize;
this.executor = executor;
}
public void run() {
for (int i = 0; i < respawnSize; i ++) {
// Spawning new sub tasks
executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
}
double sum = 0;
for (int i = 0; i < repetitions; i++) {
sum += Math.sin(i);
}
System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
}
}
public static void main(String argv[]) throws InterruptedException {
ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
executor.execute(new Task("0", 2000000, 100, executor));
System.err.println("main thread awaits completion");
executor.awaitCompletion();
System.err.println("main thread recieved completion event");
}
}
关于java - 线程池,worker既是生产者又是消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13005964/