java - 阻塞队列和多线程消费者,如何知道何时停止

标签 java multithreading producer-consumer blockingqueue

我有一个单线程生产者,它创建一些任务对象,然后将这些对象添加到 ArrayBlockingQueue(它是固定大小的)中。

我还启动了一个多线程消费者。这是作为固定线程池构建的 (Executors.newFixedThreadPool(threadCount);)。然后我将一些 ConsumerWorker 实例提交给这个 threadPool,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。

每个这样的 Worker 都会在队列上做一个 take() 并处理任务。

我的问题是,让 Worker 知道什么时候没有更多工作要做的最佳方式是什么。换句话说,我如何告诉Workers生产者已经完成加入队列,并且从这一点开始,每个worker看到队列为空时应该停止。

我现在得到的是一个设置,在该设置中,我的 Producer 使用回调进行初始化,该回调在他完成工作(将内容添加到队列中)时触发。我还保留了我创建并提交到 ThreadPool 的所有 ConsumerWorkers 的列表。当生产者回调告诉我生产者完成时,我可以告诉每个 worker 。在这一点上,他们应该继续检查队列是否为空,当队列为空时,他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时生产者会完成,发出信号,而 ConsumerWorkers 会在消耗队列中的所有内容之前停止。

我的问题是最好的同步方式是什么,以便一切正常?我是否应该同步检查生产者是否正在运行的整个部分,以及队列是否为空,并在一个 block 中从队列中取出一些东西(在队列对象上)?我应该只在 ConsumerWorker 实例上同步 isRunning boolean 值的更新吗?还有什么建议吗?

更新,这是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这在很大程度上受到了 JohnVint 在下面的回答的启发,只做了一些小的修改。

=== 由于@vendhan 的评论而更新。

感谢您的关注。你是对的,这个问题中的第一段代码(在其他问题中)有一个 while(isRunning || !inputQueue.isEmpty()) 没有真正意义的地方。

在我的实际最终实现中,我做了一些更接近您替换“||”的建议(or) with "&&"(and),意思是每个 worker (消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是的话就停止(所以理论上我们可以说 worker 有正在运行且队列不能为空)。

最佳答案

您应该继续从队列中take()。您可以使用毒丸来告诉 worker 停下来。例如:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

关于java - 阻塞队列和多线程消费者,如何知道何时停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8974638/

相关文章:

java - getSupportedCipherSuites 与 getDefaultCipherSuites Apache CXF

java - Arrays.toString,没有库

c++ - 危害指标的内存排序

c# - 确定性监视器脉冲/等待并在生产者-消费者收集中实现超时

java - 如何在没有外部jar的情况下使用Java读取excel文件

java - TreeMap 在不知道字符串的情况下查找字符串类别的值?

c# - 线程有等待语句吗?

multithreading - I/O 完成端口 (Windows) 或异步 I/O (AIO) 是否会提高处理大量请求的多线程服务器的性能?

c# - 并行订购消耗品

java - Kafka集群zookeeper故障处理