java - 多生产者多消费者多线程Java

标签 java multithreading producer-consumer blockingqueue

我正在尝试生产者-消费者问题的多个生产者 - 多个消费者用例。 我使用 BlockingQueue 在多个生产者/消费者之间共享公共(public)队列。

下面是我的代码。
制作人

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    }

}
<小时/>

消费者

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue inputQueue;
    private volatile boolean isRunning = true;

    private final Integer POISON_PILL = new Integer(-1);

    Consumer(BlockingQueue queue) {
        this.inputQueue = queue;
    }

    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(!inputQueue.isEmpty()) {

            try {
                Integer queueElement = (Integer) inputQueue.take();
                System.out.println("Consumed : " + queueElement.toString());

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Queue ");
    }

    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
}
<小时/>

测试类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerService {

    public static void main(String[] args) {

        //Creating BlockingQueue of size 10
        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        System.out.println("Producer and Consumer has been started");
    }

}

当我运行以下代码时,我没有看到正确的输出。

我在这里犯了什么错误吗?

最佳答案

你的代码有很多没有意义。我建议您坐下来弄清楚代码为何存在以及它在做什么。

如果您删除了 isFinshed 标志,则不会发生任何变化。

如果您在生产者中删除了 synchronized 的使用,您将拥有并发生产者。将仅在同步块(synchronized block)中访问的字段设置为 volatile 没有任何好处。

如果生产者是并发的,那么共享循环计数器是没有意义的。 通常,生产者会发送毒丸,而消费者不会消费该毒丸。例如如果您有两个消费者,一个可能会添加药丸,另一个可能会服用它。您的消费者会忽略毒丸,因为它会忽略 isFinished 标志。

您不想仅仅因为队列暂时为空就停止消费者。否则,它将看不到生产者生成的所有消息,甚至可能看不到任何消息。

关于java - 多生产者多消费者多线程Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25393938/

相关文章:

c# - 并行订购消耗品

java - 生产者-消费者日志记录服务以不可靠的方式关闭

javascript - 点击后如何加载特定片段?

java - 检查类型是否可分配不适用于原始类型

JavaFX 或 RIA 桌面应用程序(在 DVD 上)也可以在网络上使用吗?

c - 如何正确终止信号处理程序中的线程?

c# - 检查元素是否存在于多线程应用程序的集合中

.net - Monitor.TryEnter(object) 和 Monitor.TryEnter(object, ref bool) 之间存在什么重要区别?

scala - 使用类型类建模生产者-消费者语义?

java - JDBC 返回空结果集