java - 监视具有多个生产者和消费者的练习(Java)

标签 java multithreading producer-consumer

在java中实现多线程编程的监视器示例我设法编写了这个:

import java.util.*;
import java.util.concurrent.*;

class Monitor{
    Semaphore s_prod; // Production semaphore
    Semaphore s_rec; // Recolection semaphore
    int capacidad; // capacity of the queue
    ArrayList<Integer> cola; // my queue

    Monitor(int n){
        this.capacidad = n;
        this.cola = new ArrayList<Integer>(this.capacidad);
        this.s_prod = new Semaphore(this.capacidad);
        this.s_rec = new Semaphore(0);
    }

    public void Producir(int n){ // Producing
        try {
            this.s_prod.acquire();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        this.cola.add(n);
        System.out.println("|Prod:" + n + " | " + cola); // Printing production
        this.s_rec.release();
    }

    public int Recolectar(){ // Recolecting
        try {
            this.s_rec.acquire();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        int temp = this.cola.get(0);
        this.cola.remove(0);
        System.out.println("|Rec:" + temp + " | " + cola); // Printing recolection
        this.s_prod.release();
        return temp;
    }
}

class Productor implements Runnable{ // Productor
    Thread t;
    Monitor m;
    int a, b; // Produces number from 'a' to 'b'
    boolean finish; // flag that indicates the end of production

    Productor(String nombre, Monitor m, int i, int j){
        this.t = new Thread(this, "Productor " + nombre);
        this.a = i;
        this.b = j;
        this.m = m;
        finish = false;
    }

    public void run(){
        for(int i = a; i <= b; i++){
            m.Producir(i);
            try{
                Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
        this.finish = true;
    }

    public void empezar(){
        this.t.start();
    }
}

class Recolector implements Runnable{ // Consumer
    Thread t;
    Monitor m;

    Recolector(String nombre, Monitor m){
        this.t = new Thread(this, "Recolector " + nombre);
        this.m = m;
    }

    public void run(){
        /*
        while all the producers are still working and, even if they finished, the queue
        needs to be emptied
        */
        while(!(Checking.product_finished && m.cola.size() == 0)){
            m.Recolectar();
            try{
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public void empezar(){
        this.t.start();
    }
}

class Checking implements Runnable{ // Threads that checks if the producers finished
    public volatile static boolean product_finished;
    Productor[] p_array;

    Checking(Productor[] prods){
        p_array = prods;
        new Thread(this).start();
    }

    public void run(){
        boolean flag = true;
        while(flag){
            flag = false;
            for(Productor p:p_array){
                if(!p.finish){
                    flag = true;
                }
            }
        }
        System.out.println("Produccion finalizada!");
        product_finished = true;
    }
}

public class Test{
    public static void main(String args[]){
        Monitor m = new Monitor(3); // monitor for queue of capacity=3
        Productor p1 = new Productor("P1", m, 10, 20);
        Productor p2 = new Productor("P2", m, -50, -40);
        Recolector r1 = new Recolector("R1", m);
        Recolector r2 = new Recolector("R1", m);
        Recolector r3 = new Recolector("R1", m);

        Productor[] p_array = new Productor[2];
        p_array[0] = p1;
        p_array[1] = p2;

        Checking ch = new Checking(p_array);
        p1.empezar();
        p2.empezar();
        r1.empezar();
        r2.empezar();
        r3.empezar();
    }
}

运行时,输出显示执行的操作和队列状态,但存在一些问题,例如忽略队列的最大长度,同时生成两个元素并意外删除第一个元素,或超出队列范围。我认为需要一个“同步”语句,但我无法在不阻塞整个程序的情况下使其工作。

此外,在“Checker”类中,我使用一个线程来不断检查生产者是否完成了他们的工作,但是我想知道是否有更好的方法来检查它我的电脑过热。

主函数中该值的输出最多只能生成三个数字,并等待空闲位置来生成新的数字

最佳答案

希望我的回答对您有帮助。

首先,您的代码忽略了队列的最大长度,因为您使用的是ArrayList,并且其构造函数参数是initialCapacity,而不是最大容量。一旦ArrayList的大小达到initialCapacity,它就会调整大小。我建议您使用LinkedBlockingQueue其具有最大固定容量。这种类型的队列也非常适合您的任务,因为所有生产者都会等待,直到队列中有可用空间,所有消费者都会等待,直到有可用元素。所以你不需要信号量。

要检查所有制作人是否已完成工作,您可以使用 CompletableFuture它提供了许多有用的方法。

完整的代码如下所示:

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;
    private int a, b;

    public Producer(BlockingQueue<Integer> queue, int a, int b) {
        this.queue = queue;
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        for (int i = a; i <= b; i++){
            try {
                //producer will wait here if there is no space in the queue
                queue.put(i);
                System.out.println("Put: " + i);
                Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }
}

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;
    public boolean finish = false;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (!finish || queue.size() > 0) {
                // consumer will wait here if the queue is empty;
                // we have to poll with timeout because several consumers may pass 
                // here while queue size is less than number of consumers;
                // timeout should be at least equal to producing interval
                Integer temp = queue.poll(3, TimeUnit.SECONDS);
                if (temp != null) {
                    System.out.println("Took: " + temp);
                    Thread.sleep(2000);
                }
            }
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}

并测试它:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); //queue of capacity = 3

Producer p1 = new Producer(queue, 10, 20);
Producer p2 = new Producer(queue, -50, -40);

List<Consumer> consumers = new ArrayList<>();
CompletableFuture[] consumersFutures = new CompletableFuture[3];
for (int i = 0; i < 3; i++) {
    Consumer consumer = new Consumer(queue);
    consumers.add(consumer);
    //this static method runs Runnable in separate thread
    consumersFutures[i] = CompletableFuture.runAsync(consumer);
}

CompletableFuture[] producersFutures = new CompletableFuture[2];

producersFutures[0] = CompletableFuture.runAsync(p1);
producersFutures[1] = CompletableFuture.runAsync(p2);

// allOf returns new CompletableFuture that is completed only 
// when the last given future completes
CompletableFuture.allOf(producersFutures).thenAccept(v -> {
    System.out.println("Completed producing!");
    for (Consumer consumer: consumers) {
        consumer.finish = true;
    }
});

// waiting for all consumers to complete
CompletableFuture.allOf(consumersFutures).get();

System.out.println("Completed consuming!");

关于java - 监视具有多个生产者和消费者的练习(Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47744155/

相关文章:

java - 如果 ExecutorService 的队列已满会发生什么

java - 使用时间可变的超时机制

c# - 进程退出问题线程未成功退出

java - 生产者消费者终止

c - 生产者消费者问题中的竞争条件

java - 编写一个输入验证循环,要求用户输入 “Yes” 或 “No”

java - 为什么Hibernate会忽略@Column注解的name属性?

c++ - 多线程: Signal vs BusyWait(Polling), 线程间条件变量问题

wpf - 该领域有人使用过 Dispatcher.Yield 吗?

go - 在汇合的kafka go中阅读来自kafka主题的消息时,如何使用确认?