Java生产者/消费者,检测处理结束

标签 java queue task producer-consumer

我正在准备一个应用程序,其中单个生产者生成数百万个任务,然后由数量可配置的消费者处理。从生产者到消费者的通信(可能)将基于队列。

从运行生产者/生成任务的线程,我可以用什么方法等待所有任务完成?我宁愿不恢复任何定期轮询以查看我的任务队列是否为空。无论如何,任务队列为空实际上并不能保证最后的任务已经完成。这些任务可能运行时间相对较长,因此很可能在消费者线程仍在愉快地处理时队列为空。

Rgds,马丁

最佳答案

您可能想看看 java.util.concurrent 包。

执行器框架已经提供了通过线程池执行任务的方法。 Future 抽象允许等待任务完成。

将两者放在一起可以让您轻松协调执行,解耦任务、 Activity (线程)和结果。

示例:

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    //dispatch 
    List<Future<Void>> results =  executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    for(Future<Void> result: results){
        result.get();
    }

编辑:使用 CountDownLatch 的替代版本

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    final CountDownLatch latch;

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    latch = new CountDownLatch(tasks.size());

    //dispatch 
    executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    latch.await();

在你的任务中:

    Callable<Void> task = new Callable<Void>()
    {

        @Override
        public Void call() throws Exception
        {
            // TODO: do your stuff

            latch.countDown(); //<---- important part
            return null;
        }
    };

关于Java生产者/消费者,检测处理结束,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8693050/

相关文章:

泛化中的 Java 泛型和 ClassCastException

java - HashMap 持有重复键

java - 寻址未知服务器 ActiveMQ 时连接停止

c++ - 简单,但找不到 : syntax to work with member variables of STL queues with type class

python - 如何在用beat再次运行之前检查celery任务是否已经在运行?

c# - 如何检查所有任务是否已正确完成?

gradle - 如何覆盖 gradle kotlin-dsl 中的任务

java - 测试驱动开发不适合我的类(class)

java - Android LinkedBlockingQueue 取清空列表

java - 了解 Android 中的 Arraylist IndexOutOfBoundsException