我使用 ExecutorService
来简化并发多线程程序。取以下代码:
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
Future<..> ... = exService.submit(..);
...
}
在我的情况下,问题是如果所有 NUMBER_THREADS
都被占用,则 submit()
不会阻塞。结果是任务队列被许多任务淹没。这样做的结果是,使用 ExecutorService.shutdown()
关闭执行服务需要很长时间(ExecutorService.isTerminated()
将长时间为假)。原因是任务队列还是挺满的。
目前我的解决方法是使用信号量来禁止 ExecutorService
的任务队列中有许多条目:
...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
semaphore.aquire();
// internally the task calls a finish callback, which invokes semaphore.release()
// -> now another task is added to queue
Future<..> ... = exService.submit(..);
...
}
我确定有更好的更封装的解决方案?
最佳答案
诀窍是使用固定的队列大小并且:
new ThreadPoolExecutor.CallerRunsPolicy()
我也推荐使用 Guava 的 ListeningExecutorService . 这是一个示例消费者/生产者队列。
private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
任何更好的东西,您可能需要考虑像 RabbitMQ 或 ActiveMQ 这样的 MQ,因为它们具有 QoS 技术。
关于java - ExecutorService,避免任务队列太满的标准方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2247734/