我正在尝试生产者-消费者问题的多个生产者 - 多个消费者用例。 我使用 BlockingQueue 在多个生产者/消费者之间共享公共(public)队列。
下面是我的代码。
制作人
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
}
}
<小时/>
消费者
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue inputQueue;
private volatile boolean isRunning = true;
private final Integer POISON_PILL = new Integer(-1);
Consumer(BlockingQueue queue) {
this.inputQueue = queue;
}
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(!inputQueue.isEmpty()) {
try {
Integer queueElement = (Integer) inputQueue.take();
System.out.println("Consumed : " + queueElement.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Queue ");
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
}
<小时/>
测试类
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue queue = new ArrayBlockingQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
当我运行以下代码时,我没有看到正确的输出。
我在这里犯了什么错误吗?
最佳答案
你的代码有很多没有意义。我建议您坐下来弄清楚代码为何存在以及它在做什么。
如果您删除了 isFinshed
标志,则不会发生任何变化。
如果您在生产者中删除了 synchronized
的使用,您将拥有并发生产者。将仅在同步块(synchronized block)中访问的字段设置为 volatile 没有任何好处。
如果生产者是并发的,那么共享循环计数器是没有意义的。
通常,生产者会发送毒丸,而消费者不会消费该毒丸。例如如果您有两个消费者,一个可能会添加药丸,另一个可能会服用它。您的消费者会忽略毒丸,因为它会忽略 isFinished
标志。
您不想仅仅因为队列暂时为空就停止消费者。否则,它将看不到生产者生成的所有消息,甚至可能看不到任何消息。
关于java - 多生产者多消费者多线程Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25393938/