java - 同一 ExecutorService 中的多个生产者和单个消费者

标签 java multithreading

问题陈述:从无限的整数流中识别两个连续的整数,这些整数由多个生产者生成,但当相同的数字再次重复时,单个消费者会发出警报。

我有多个生产者和一个消费者。如果我将 Consumer 提交到相同的 ExecutorService , Consumer 不会启动。但是,如果我在单独的线程中运行 Consumer,Consumer 线程将按预期启动。

代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class FixedBlockingQueue {
    final BlockingQueue<Integer> queue;
    private int capacity;

    public FixedBlockingQueue(int capacity){
        super();
        this.capacity = capacity;
        queue = new ArrayBlockingQueue<Integer>(capacity);
        System.out.println("Capactiy:"+this.capacity);
    }
    public void addElement(Integer element){
        try{
            queue.put(element);
        }catch(Exception err){
            err.printStackTrace();
        }
    }
    public void startThreads(){
        ExecutorService es = Executors.newFixedThreadPool(1);
        for ( int i =0; i < 10; i++){
            es.submit(new MyProducer(this));
        }
        //es.submit(new MyConsumer(queue));
        new Thread(new MyConsumer(this)).start();
    }
    public BlockingQueue<Integer> getQueue(){
        return queue;
    }
    public static void main(String args[]){
        FixedBlockingQueue f = new FixedBlockingQueue(1);
        f.startThreads();
    }
}

class MyProducer implements Runnable{

    private FixedBlockingQueue queue;
    public MyProducer(FixedBlockingQueue queue){
        this.queue = queue;     
    }
    public void run(){
        for ( int i=1; i< 5; i++){
            queue.addElement(new Integer(i));
            System.out.println("adding:"+i);
        }
    }
}

class MyConsumer implements Runnable{
    private BlockingQueue<Integer>  queue;
    Integer firstNumber = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public MyConsumer(FixedBlockingQueue fQueue){
        this.queue = fQueue.getQueue();
    }
    /* TODO : Compare two consecutive integers in queue are same or not*/
    public void run(){  
        Integer secondNumber = 0;
        while ( true){
            try{
                lock.lock();
                System.out.println("queue size:"+queue.size());
                if ( queue.size() > 0) {
                    secondNumber = queue.remove();
                    System.out.println("Removed:"+secondNumber);
                    System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber);
                    if ( firstNumber.intValue() ==  secondNumber.intValue()){
                        System.out.println("Numbers matched:"+firstNumber);
                    }
                    firstNumber = secondNumber;
                }
                Thread.sleep(1000);
            }catch(Exception err){
                err.printStackTrace();
            }finally{
                lock.unlock();
            }
        }
    }
}

输出:

Capactiy:1
adding:1

如果我更改代码

es.submit(new MyConsumer(queue));
//new Thread(new MyConsumer(queue)).start();

//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(queue)).start();

消费者线程正常启动。

输出:

Capactiy:1
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:0:1
adding:2
queue size:1
Removed:2
Numbers:Num1:Num2:1:2
adding:3
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
adding:4
queue size:1
Removed:4
Numbers:Num1:Num2:3:4
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:4:1
adding:2
queue size:1
Removed:2
adding:3
Numbers:Num1:Num2:1:2
queue size:1
Removed:3
Numbers:Num1:Num2:2:3

第一种方法:

我知道消费者不会消耗该数字,但理想情况下它不应阻止其他生产者任务的提交。

如果是这样的话,使用ExecutorService来替代简单的Threads不能达到100%的效果吗?

最佳答案

您使用单个线程和固定容量 1 的 BlockingQueue 创建一个线程池。然后,您向该池提交三个任务:前两个任务分别尝试将五个值排入队列,然后是一个当任何值可用时将其出列。

因为您的固定大小池只有一个线程,所以您提交给它的任务将按顺序运行,而不是并行运行。您首先提交一个生产者任务,因此它首先运行。但是,一旦将第一个数字放入队列,它就无法取得任何进一步的进展,因为队列已满。并且队列将永远保持满状态,因为生产者任务必须在池线程可用于其他任务(例如消费者)之前完成。

我不确定为什么你要为此使用线程池,因为直接进行线程管理并不难,特别是因为你的任务已经实现了Runnable。但是,如果您确实使用池,请确保其中有足够的线程来同时容纳所有任务。

另请注意,BlockingQueue 实现应该是线程安全的,标准库提供的所有实现确实都是如此。因此,您不需要在 addElement() 中执行自己的锁定。此外,如果您确实需要执行自己的锁定,那么您不仅需要在将元素入队时执行此操作,还需要在将它们出队时执行此操作。

此外,非常奇怪的是,您的生产者任务通过 FixedBlockingQueue 实例间接向底层队列添加元素,但消费者任务直接进入底层队列。

您的 FixedBlockingQueue 类的名称选择不当,因为它暗示该类实现了 BlockingQueue,但该类实际上并未这样做。

关于java - 同一 ExecutorService 中的多个生产者和单个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37795707/

相关文章:

java - 将 wait 和 notifyAll 代码转换为使用 Java 1.5 Lock 对象

Android通过处理程序在单独的线程上更新TextView

c++ - 使用 sendfile() 通过线程或其他高效的复制文件方法复制文件

c++ - 来自非 qt 线程的信号 qt,QueuedConnection

java - 为什么 j.u.c.CopyOnWriteArrayList 在方法内部创建局部锁变量

java - Jsoup 从下拉菜单中选择元素

java - JSch sftp 传输剥离 Windows 行结尾

java - 如何在Struts2中将JSP目标设置为父框架?

java - RHEL Eclipse 无法加载 JNI 共享库 libjvm.so

android - 在 Kotlin 中,处理程序线程会等待另一个线程完成吗?