java 生产者消费者问题-让线程等待其他线程完成后再完成

标签 java multithreading time synchronization producer-consumer

我目前正在解决多线程的生产者消费者问题。最初有 1000 个字节可用,500 个字节已被 RAM 和驱动程序占用,剩下 500 个字节用于线程。共有 4 个生产者,如下:

  1. 启动 10 秒 BubbleWitch2 session 的线程,每个 session 需要 100 字节 RAM 第二次
  2. 启动 20 秒 Spotify 流的线程,每秒需要 250 字节 RAM。
  3. 系统和管理线程,每秒总共需要 50 字节 RAM,以及 一旦调用,就会执行随机的时间长度。
  4. 安装 2 KB 新安全更新的线程,该更新将存储到磁盘,安装时每秒需要 150 字节 RAM。假设系统中有足够的磁盘容量来支持该线程。

该程序将在安全更新完成后停止执行。理想情况下,这应该在不设置线程优先级的情况下实现。它早些时候工作正常,但现在当我运行该程序时,安全线程在中间完成,Spotify 正在结束笑声。是否存在任何可能导致此问题的错误?我在下面包含了我的代码。我尚未将所有字节大小分配给线程和缓冲区。

我的主要方法。

/**
 * Created by User on 10/08/2014.
 */
public class ProducerConsumerTest {
        public static void main(String[] args) throws InterruptedException {
            Buffer c = new Buffer();
            BubbleWitch2 p1 = new BubbleWitch2(c, 1);
            Processor c1 = new Processor(c, 2);
            Spotify p2 = new Spotify(c, 3);
            SystemManagement p3 = new SystemManagement(c,4);
            securityUpdate p4 = new securityUpdate(c,5, p1,p2,p3);

            p1.setName("BubbleWitch2 ");
            p2.setName("Spotify ");
            p3.setName("System Management ");
            p4.setName("Security Update ");

            c1.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();

            p2.join();
            p3.join();
            p4.join();
            System.exit(0);

        }
    }

我的缓冲区/小房间类

/**
 * Created by User on 10/08/2014.
 */
class Buffer {
    private int contents;
    private boolean available = false;
    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        available = false;
        notifyAll();
        return contents;
    }
    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

我的消费者类别

class Processor extends Thread {
    private Buffer cubbyhole;
    private int number;
    public Processor(Buffer c, int number) {
        cubbyhole = c;
        this.number = number;
    }
    public void run() {
        int value = 0;
        for (int i = 0; i < 60; i++) {
            value = cubbyhole.get();
            System.out.println("Processor #"
                    + this.number
                    + " got: " + value);
        }
    }
}

我的 Spotify 制作者类(class)

class Spotify extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes;

    public Spotify(Buffer c, int number) {
        buffer = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 20; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("Spotify has finished executing.");
        System.out.println("*****************************");

    }
}

我的泡泡女巫制作人类

import java.lang.*;
import java.lang.System;
/**
 * Created by User on 10/08/2014.
 */
class BubbleWitch2 extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes;

    public BubbleWitch2(Buffer c, int number) {
        buffer = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("BubbleWitch2 has finished executing.");
        System.out.println("*****************************");
    }
}

我的系统管理生产者类

  class SystemManagement extends Thread {
        private Buffer buffer;
        private int number, min = 1, max = 15;
        private int loopCount = (int) (Math.random() * ( max - min ));

        public SystemManagement(Buffer c, int number) {
            buffer = c;
            this.number = number;
        }

        public void run() {
            for (int i = 0; i < loopCount; i++) {
                buffer.put(i);
                System.out.println(getName() + this.number
                        + " put: " + i);
                try {
                    sleep(1000);
                } catch (InterruptedException e) { }
            }
            System.out.println("*****************************");
            System.out.println("System Management has finished executing.");
            System.out.println("*****************************");
        }
    }

我的安全更新类(class)

/**
 * Created by User on 14/08/2014.
 */
import java.lang.*;
import java.lang.System;

/**
 * Created by User on 11/08/2014.
 */
class securityUpdate extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes = 150;
    private int process = 0;

    public securityUpdate (Buffer c, int number, BubbleWitch2 bubbleWitch2, Spotify spotify, SystemManagement systemManagement) throws InterruptedException {
        buffer = c;
        this.number = number;
        bubbleWitch2.join();
        spotify.join();
        systemManagement.join();
    }

    public void run() {

        for (int i = 0; i < 10; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("Security Update has finished executing.");
        System.out.println("*****************************");
    }
}

我希望能够使其最后运行,而无需在计数中硬编码不同的数字,因为我需要在代码中计算以每秒 150 字节的速度运行 2000 字节大小需要多长时间,这将使硬编码变得无关紧要。有人有什么想法吗?

最佳答案

使用更简单的执行器框架尝试下面修改后的代码:-

public class ProducerConsumerTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> c = new ArrayBlockingQueue<Integer>(1);
        CountDownLatch doneSignal = new CountDownLatch(3);

        Processor c1 = new Processor(c, 2, doneSignal);

        BubbleWitch2 p1 = new BubbleWitch2(c, 1, doneSignal);        
        Spotify p2 = new Spotify(c, 3, doneSignal);
        SystemManagement p3 = new SystemManagement(c,4, doneSignal);
        SecurityUpdate p4 = new SecurityUpdate(c,5, doneSignal);

        p1.setName("BubbleWitch2 ");
        p2.setName("Spotify ");
        p3.setName("System Management ");
        p4.setName("Security Update ");

        ExecutorService exec = Executors.newCachedThreadPool();
        exec.submit(c1);
        exec.submit(p1);
        exec.submit(p2);
        exec.submit(p3);        

        Future<?> securityFuture = exec.submit(p4);

        try {
            while(securityFuture.get()!=null) {

            }           
            exec.shutdown();
            while(exec.awaitTermination(1000, TimeUnit.MILLISECONDS)) {

            }
            exec.shutdownNow();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.exit(0);
    }
}
class Processor extends Thread {
    private BlockingQueue<Integer> cubbyhole;
    private int number;
    private CountDownLatch doneSignal;

    public Processor(BlockingQueue<Integer> c, int number,CountDownLatch doneSignal) {
        cubbyhole = c;
        this.number = number;
        this.doneSignal = doneSignal;
    }
    public void run() {
        int value = 0;
       // for (int i = 0; i < 60; i++) {
        while(true) {
            try {
                value = cubbyhole.take();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                //e.printStackTrace();
            }
            System.out.println("Processor #"
                    + this.number
                    + " got: " + value);
        }
        //doneSignal.countDown();
    }
}

class Spotify extends Thread {
    private BlockingQueue<Integer> buffer;
    private int number;
    private int bytes;
    private CountDownLatch doneSignal;

    public Spotify(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
        buffer = c;
        this.number = number;
        this.doneSignal = doneSignal;
    }

    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                buffer.put(i);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(getName() + this.number
                    + " put: " + i);           
        }
        System.out.println("*****************************");
        System.out.println("Spotify has finished executing.");
        System.out.println("*****************************");
        doneSignal.countDown();
    }
}

class BubbleWitch2 extends Thread {
    private BlockingQueue<Integer> buffer;
    private int number;
    private int bytes;
    private CountDownLatch doneSignal;

    public BubbleWitch2(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
        buffer = c;
        this.number = number;
        this.doneSignal = doneSignal;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                buffer.put(i);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            System.out.println(getName() + this.number
                    + " put: " + i);

        }
        System.out.println("*****************************");
        System.out.println("BubbleWitch2 has finished executing.");
        System.out.println("*****************************");
        doneSignal.countDown();
    }
}

class SystemManagement extends Thread {
    private BlockingQueue<Integer> buffer;
    private int number, min = 1, max = 15;
    private int loopCount = (int) (Math.random() * ( max - min ));
    private CountDownLatch doneSignal;

    public SystemManagement(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
        buffer = c;
        this.number = number;
        this.doneSignal = doneSignal;
    }

    public void run() {
        for (int i = 0; i < loopCount; i++) {
            try {
                buffer.put(i);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            System.out.println(getName() + this.number
                    + " put: " + i);

        }
        System.out.println("*****************************");
        System.out.println("System Management has finished executing.");
        System.out.println("*****************************");
        doneSignal.countDown();
    }
}

class SecurityUpdate extends Thread {
    private BlockingQueue<Integer> buffer;
    private int number;
    private int bytes = 150;
    private int process = 0;
    private CountDownLatch doneSignal;

    public SecurityUpdate (BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
        buffer = c;
        this.number = number;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            doneSignal.await();         
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        for (int i = 0; i < 10; i++) {
            try {
                buffer.put(i);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            System.out.println(getName() + this.number
                    + " put: " + i);

        }
        System.out.println("*****************************");
        System.out.println("Security Update has finished executing.");
        System.out.println("*****************************");
    }
}

如果您有任何疑问,请告诉我

关于java 生产者消费者问题-让线程等待其他线程完成后再完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25334793/

相关文章:

linux - 用户模式同步与内核模式同步(在 Linux 和 Windows 中)

multithreading - 处理同一应用程序的多个线程的有效方法

javascript - 如何将输入类型时间的值传递给 Date 对象?

ruby-on-rails - 如何在最后没有偏移量的情况下检索时间?

java - 有什么可以阻止 java 内联方法吗?

java - Java 中 nextInt()、hasNext() 的 Python 实现是什么?

java - Kafka Avro 序列化器和反序列化器异常。 Avro 支持的类型

java - 插入已排序的链表

java - 一个线程所做的更改不会反射(reflect)在另一个线程中

Java String 本地每分钟更新一次的日期