生产者消费者应用程序上的 Java 多线程查询

标签 java multithreading producer-consumer

我写了一个简单的java多线程程序。

我对代码有一些疑问。请帮我解答这些问题。

提前致谢!

这是我的代码:

生产者.java

package com.prodcon;
import java.util.Stack;
public class Producer extends Thread {
    DataStorage data;
    MainProcess tempmp;
    public Producer(DataStorage dst, MainProcess mp){
        data = dst;
        tempmp = mp;
    }
    public void run(){
        for(int i = 0; i < 3; i++){
            System.out.println("Thread:"+this.getName()+"called");
            data.PutData();
            /*-------------current states---------------------*/
            System.out.println("Current states of the threads:");
            System.out.println("p1->"+tempmp.p1.getState());
            System.out.println("p2->"+tempmp.p2.getState());
            System.out.println("p3->"+tempmp.p3.getState());
            System.out.println("c1->"+tempmp.c1.getState());
            System.out.println("c2->"+tempmp.c2.getState());
            System.out.println("c3->"+tempmp.c3.getState());
            /*-------------current states---------------------*/

        }
    }
}

消费者.java

  package com.prodcon;

    public class Consumer extends Thread {
        DataStorage data;
        MainProcess tempmp;
        public Consumer(DataStorage dst, MainProcess mp){
            data = dst;
            tempmp = mp;
        }
        public void run(){
            for(int i = 0; i < 3; i++){
                System.out.println("Thread:"+this.getName()+"called");
                data.GetData();
                /*-------------current states---------------------*/
                System.out.println("Current states of the threads:");
                System.out.println("p1->"+tempmp.p1.getState());
                System.out.println("p2->"+tempmp.p2.getState());
                System.out.println("p3->"+tempmp.p3.getState());
                System.out.println("c1->"+tempmp.c1.getState());
                System.out.println("c2->"+tempmp.c2.getState());
                System.out.println("c3->"+tempmp.c3.getState());
                /*-------------current states---------------------*/
            }
        }
    }

数据存储.java

    package com.prodcon;

import java.util.Random;
import java.util.Stack;

import javax.xml.crypto.Data;

public class DataStorage {

    int countofdata;
    Stack<Double> data;

    public DataStorage() {
        countofdata = 0;
        data = new Stack<Double>();
    }

    public synchronized void GetData() {
        while (data.isEmpty()) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
            }

        }
        double temp = (double) data.pop();
        //System.out.println("Data poped out:" + temp);
        countofdata++;
        notifyAll();
    }

    public synchronized void PutData() {
        while (true) {
            if (data.size() == 3) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                break;
            }
        }
        double temp = Math.random();
        data.push(temp);
        //System.out.println("Data inserted in storage:" + temp);
        countofdata--;
        notifyAll();
    }
}

MainProcess.java

    package com.prodcon;

public class MainProcess {

    /**
     * @param args
     */
    DataStorage ProcessData;
    public Producer p1, p2, p3, p4;
    public Consumer c1, c2, c3, c4;
    public MainProcess(){
        ProcessData = new DataStorage();
        p1 = new Producer(ProcessData, this);
        p2 = new Producer(ProcessData, this);
        p3 = new Producer(ProcessData, this);
        c1 = new Consumer(ProcessData, this);
        c2 = new Consumer(ProcessData, this);
        c3 = new Consumer(ProcessData, this);
        p1.setName("p1");
        p2.setName("p2");
        p3.setName("p3");
        c1.setName("c1");
        c2.setName("c2");
        c3.setName("c3");
    }
    public void startprocess(){
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        MainProcess mp1 = new MainProcess();
        mp1.startprocess();

    }

}

这是该程序的输出:

    Thread:p2called
Thread:p3called
Current states of the threads:
Thread:p1called
Current states of the threads:
Thread:c3called
Current states of the threads:
p1->RUNNABLE
p2->BLOCKED
p3->RUNNABLE
c1->BLOCKED
c2->BLOCKED
c3->BLOCKED
Thread:p3called
Current states of the threads:
p1->RUNNABLE
p2->BLOCKED
p3->RUNNABLE
c1->BLOCKED
c2->BLOCKED
c3->BLOCKED
Thread:p3called
Thread:c2called
Thread:c1called
Current states of the threads:
Current states of the threads:
p1->RUNNABLE
Current states of the threads:
p2->BLOCKED
p1->RUNNABLE
p2->BLOCKED
p3->BLOCKED
c1->BLOCKED
c2->BLOCKED
c3->BLOCKED
Thread:p1called
Current states of the threads:
p1->RUNNABLE
p2->BLOCKED
p3->BLOCKED
c1->BLOCKED
c2->BLOCKED
c3->BLOCKED
Thread:p1called
Current states of the threads:
p1->RUNNABLE
p2->BLOCKED
p3->BLOCKED
c1->RUNNABLE
c2->RUNNABLE
c3->BLOCKED
Thread:c1called
Current states of the threads:
p1->BLOCKED
p2->BLOCKED
p3->BLOCKED
c1->RUNNABLE
c2->RUNNABLE
c3->BLOCKED
Thread:c1called
Current states of the threads:
p1->BLOCKED
p2->BLOCKED
p3->BLOCKED
c1->RUNNABLE
c2->RUNNABLE
c3->BLOCKED
Current states of the threads:
p1->RUNNABLE
p2->BLOCKED
p3->BLOCKED
c1->TERMINATED
c2->RUNNABLE
c3->BLOCKED
p1->RUNNABLE
p2->BLOCKED
p3->BLOCKED
c1->TERMINATED
c2->BLOCKED
c3->RUNNABLE
Thread:c3called
Current states of the threads:
p1->TERMINATED
p2->BLOCKED
p3->BLOCKED
c1->TERMINATED
c2->BLOCKED
c3->RUNNABLE
Thread:c3called
Current states of the threads:
p1->TERMINATED
p2->BLOCKED
p3->BLOCKED
c1->TERMINATED
c2->BLOCKED
c3->RUNNABLE
p3->RUNNABLE
p1->BLOCKED
p2->RUNNABLE
p3->RUNNABLE
c1->TERMINATED
c2->BLOCKED
c3->TERMINATED
Thread:p2called
Current states of the threads:
p1->TERMINATED
p2->RUNNABLE
p3->RUNNABLE
c1->TERMINATED
c2->BLOCKED
c3->TERMINATED
Thread:p2called
Current states of the threads:
p1->TERMINATED
p2->RUNNABLE
p3->RUNNABLE
c1->TERMINATED
c2->BLOCKED
c3->TERMINATED
p1->TERMINATED
p2->TERMINATED
p3->RUNNABLE
c1->TERMINATED
c2->RUNNABLE
c3->TERMINATED
Thread:c2called
Current states of the threads:
p1->TERMINATED
p2->TERMINATED
p3->RUNNABLE
c1->TERMINATED
c2->RUNNABLE
c3->TERMINATED
Thread:c2called
Current states of the threads:
p1->TERMINATED
p2->TERMINATED
p3->RUNNABLE
c1->TERMINATED
c2->RUNNABLE
c3->TERMINATED
c1->TERMINATED
c2->TERMINATED
c3->TERMINATED

我的问题是:

1.根据程序,这个进程永远不应该停止...但是有些线程会自动终止,为什么?

2.即使在要求某些线程进入等待状态后,根据输出,也没有线程进入等待状态。为什么以及如何?

3.根据这段代码:生产者-消费者问题和生产者-消费者问题有什么区别 读者-作者问题?

再次感谢!!

最佳答案

第一和第二个问题 - 这段代码由于逻辑而自然退出。尝试在此处添加调试输出以查看生产者何时退出。

if (data.size() == 3) {
    try {
        wait();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
} else {
    System.out.printf("%s: breaking as data size is %d, %d%n", Thread.currentThread().getName(), countofdata, data.size());

    break;
}

另一个可能的问题是:

for(int i = 0; i < 3; i++){
    data.PutData();
}

您只在队列中放入了 3 次。

提示:Java 中生产者-消费者问题的经典实现是 LinkedBlockingQueue

第三个问题。我将尝试向生产者和消费者解释这一点。读写器问题是当您没有生产者,但有许多访问单个共享资源的消费者时。其中一些可以读取资源,有些可以写入资源。可以同时读取,并且写入锁是独占的。

更新

要模拟多次读取,您可以尝试使用

  • LinkedBlockingQueueDataStorage 的性能良好的等效项。
  • 可重入ReadWriteLock
  • 或更新您的解决方案。这是研究示例,我没有测试,但应该为您提供总体思路:
final Object readLock = new Object();      //use objects as locks
final Object writeLock = new Object();
public void GetData() {
    synchronized (readLock) {             // acquire only the read lock
        while (data.isEmpty()) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
            }

        }
        double temp = (double) data.pop();
        //System.out.println("Data poped out:" + temp);
        countofdata++;
        System.out.printf("%s: %d, %d%n", Thread.currentThread().getName(), countofdata, data.size());
        notifyAll();        
     }
 }

public void PutData() {
    synchronized (readLock) {       //first, acquire the read lock
        synchronized (writeLock) {  // then acquire the write lock
            while (true) {
                if (data.size() == 3) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                } else {
                    System.out.printf("%s: breaking as data size is %d, %d%n", Thread.currentThread().getName(), countofdata, data.size());

                    break;
                }
            }
            double temp = Math.random();
            data.push(temp);
            //System.out.println("Data inserted in storage:" + temp);
            countofdata--;
            notifyAll();
        }
    }
}

关于生产者消费者应用程序上的 Java 多线程查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19462229/

相关文章:

java - 导入的 Android native 模块始终为 'undefined'

java - Keycloak:未为联合用户保存授予的范围

multithreading - 当线程完成工作时,我是否需要在 JavaFX 中使用线程和任务在后台运行某些东西?

c - 使用信号量编译但未运行的生产者-消费者代码

java - 拆分解析文本以分隔日期和时间android

java - 为什么噪声算法使用 256 个排列值?

java - 如何取消从另一个线程发出http请求的线程

multithreading - hibernate session 线程

Java线程notify() wait()以加快计算速度

ruby - 使用 EventMachine 设置无限循环以生成随机数据