java批量生产者消费者

标签 java multithreading producer-consumer

我在生产者 - 消费者中遇到问题。 我的要求是:

生产者一起生产100个对象,等待消费者消费。 然后 Consumer 消费这 100 个对象并等待 Producer 生产。 并且这个过程会重复。

条件是,生产者不应该生产直到对象大小为0, 并且消费者在对象大小为 100 之前不应消费。 IE 。 仅批量生产和消费 100 个。

class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Producer(Queue<Integer> queue, int maxSize, String name) {
    super(name);
    this.queue = queue;
    this.maxSize = maxSize;
}

@Override
public void run() {
    while (true) {
        synchronized (queue) {
            while (queue.size() == maxSize) {
                try {
                    System.out.println("Queue is full, "
                            + "Producer thread waiting for "
                            + "consumer to take something from queue");
                    queue.wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            Random random = new Random();
            int i = random.nextInt();
            System.out.println("Producing value : " + i);
            queue.add(i);
            queue.notifyAll();
        }
    }
}

}

class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Consumer(Queue<Integer> queue, int maxSize, String name) {
    super(name);
    this.queue = queue;
    this.maxSize = maxSize;
}

@Override
public void run() {
    while (true) {
        synchronized (queue) {
            while (queue.isEmpty()) {
                System.out.println("Queue is empty,"
                        + "Consumer thread is waiting"
                        + " for producer thread to put something in queue");
                try {
                    queue.wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            System.out.println("Consuming value : " + queue.remove());
            queue.notifyAll();
        }
    }
}

}

public class ProdConsReference {
public static void main(String args[]) { 
    Queue<Integer> buffer = new LinkedList<Integer>(); 
    int maxSize = 10; 
    Thread producer = new Producer(buffer, maxSize, "PRODUCER"); 
    Thread consumer = new Consumer(buffer, maxSize, "CONSUMER"); 
    producer.start(); 
    consumer.start(); 
    }
}

输出:

      Queue is empty,Consumer thread is waiting for producer thread to put                             something in queue
      Producing value : 52648529
      Consuming value : 52648529
      Queue is empty,Consumer thread is waiting for producer thread to put something in queue
      Producing value : -2128028718
      Consuming value : -2128028718

任何人都可以指出我到底缺少什么吗? 提前致谢

最佳答案

我假设进行一次练习,所以这是我的 2 美分:

每次添加/删除后,您都会通知其他线程。你不应该这样做。

你想做的是:

制作人:

  1. 检查队列是否为空。
  2. 如果是:生产 100 件商品(不是 1 件!),然后通知消费者,跳转到 4。
  3. 如果没有:WAITING通知
  4. 循环到 1。(不是 2。!)

消费者:

  1. 检查队列是否已满。
  2. 如果是:消耗直到队列为空,然后通知生产者。跳至4。
  3. 如果没有:WAITING通知
  4. 循环到 1。(不是 2。!)

您可能想使用Conditions .

如果这不是练习/作业

那么你应该看看zapl的方法,他在评论中给出了: 使用代表 100 个项目批处理的列表队列。

共享:有工作队列(线程安全数据结构,我建议使用阻塞队列)。

制作人:

  • 获取批处理大小以及要处理的“任务”或项目的总数。我假设总数可以被批量大小整除。否则,您在制作时必须考虑到这一点。
  • 个项目生成到列表中 (=batch)
  • 将批处理(项目列表)放入工作队列中
  • 重复前 2 个步骤,直至达到总数。

消费者:

  • 从工作队列中获取批处理(如果/一旦可用)
  • 批量处理所有项目

请注意,如果您必须保持顺序,您可以只使用一个消费者,或者采取额外的努力来强制对结果进行排序。

关于java批量生产者消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38052072/

相关文章:

c# - C#中的匿名内部类

java - Android Bitmap多线程处理——不是线程安全的?

python - 如何用字典修复多线程/多处理?

java - 如何更快地读取 Kafka

Java并发场景——需要同步还是不需要?

java - 将 JSTL 包含到 Maven 项目中

java - Java中带有监视器的生产者/消费者的多个实例

java Producer-Consumer 并不总是终止

java - 即使文件位于工作目录中,也无法读取 java 中的文件

c# - 为什么 WCF 服务能够处理来自不同进程的调用比来自线程的调用更多