java - 使用ArrayBlockingQueue和mutex的多线程代码中的疑问

标签 java multithreading mutex blockingqueue

我正在尝试编写多线程代码。但说实话,我不明白从哪里开始。我的头也在撞击。请帮助我。

我的任务是,

  1. 有一个长度为 1 的队列,称为 pending_tasks,其中包含需要进行某些处理的任务。
  2. 还有另一个长度为 1 的队列,称为 completed_tasks,其中包含已完成处理并准备交付的任务。

我的实现思路,

  1. 首先创建两个阻塞队列,pending_taskscompleted_tasks
  2. 一个线程(生产者)始终监听来自外部的任务(如果任务被放入 pending_tasks 中)。
  3. 一个线程(消费者)始终准备好从 pending_tasks 中获取任务并开始处理,然后放入 completed_tasks 中。
  4. 然后再次进入pending_tasks,每当有任务到来时,就开始相同的处理。
  5. 基本上,这是一个单一生产者-单一消费者问题。

我的困惑,

我知道可以使用ArrayBlockingQueue和Mutex来编码。但我不明白我该如何开始。我对互斥体有很好的理解,我从这个link中读到了关于互斥体的内容。 ,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。

能否请您给我一些实现指导,以便我可以编写这段多线程代码。

我已经为此编写了一些代码,但这并没有实现我任务的最终目标。

提前致谢。期待您的友好回复。

编辑编号1

请看我下面的代码。这段代码工作正常,但缺少一项功能。请帮我添加这一点,并提供一些指导。

功能是,

  1. 当生产者线程将一些值放入pending_task队列中时,它会在那里等待一段时间。如果那时消费者将结果提供给消费者,那么就可以了。否则,它会说超时,生产者会获取另一个值并将其放入pending_task队列中,然后相同的进程启动。

请帮助我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,并且线程通信是通过使用互斥体来完成的(我认为)。请帮助我实现相同的

我的代码,

多线程类

package multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MultiThread {

    public static BlockingQueue<Integer> pending_task;
    public static BlockingQueue<Integer> completed_task;

    public MultiThread(int length) {
        pending_task = new ArrayBlockingQueue<Integer>(length, true);
        completed_task = new ArrayBlockingQueue<Integer>(length, true);
    }
}

制作人等级

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("PRODUCER: Try to put value  " + i + "  in the pending queue");
                MultiThread.pending_task.put(i);
                System.out.println("PRODUCER: Successfully put value  " + i + "  in the pending queue, now its turn to consumer");
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

消费阶层

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("CONSUMER: Try to take value from the pending queue");
                int val = MultiThread.pending_task.take();
                System.out.println("CONSUMER:  Successfully take value, and that is   " + val);
                System.out.println("CONSUMER: Processing starts");
                Thread.sleep(1000);
                System.out.println("CONSUMER: Processing ends");
                System.out.println("CONSUMER: Try to put that  that value in  completed queue, and the value is   " + val);
                MultiThread.completed_task.put(val);
                System.out.println("CONSUMER: Successfully put into completed queue");

                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

DeliveryBoy 类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DeliveryBoy implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
                int val = MultiThread.completed_task.take();
                System.out.println("DELIVERYBOY:  Succesfully take value from completed queue and the vlue is  " + val);
                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

测试类

package multithread;

public class Test {

    public static void main(String[] args) {
        // TODO code application logic here
        MultiThread ml = new MultiThread(1);
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new DeliveryBoy()).start();
    }
}

最佳答案

来自ArrayBlockingQueue#put

public void put(E e) throws InterruptedException

Inserts the specified element at the tail of this queue, waiting for **space to become available if the queue is full

来自ArrayBlockingQueue#take

public E take() throws InterruptedException

Description copied from interface: BlockingQueue Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

因此,您需要做的就是从线程中调用这些方法。
试试这个(研究 javadoc),当您遇到更具体的问题时,您可以再次询问。

关于java - 使用ArrayBlockingQueue和mutex的多线程代码中的疑问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16080736/

相关文章:

c++ - 如何实现多线程访问的类锁对象

windows - Windows 上的递归互斥体?

java - 工作小数点数

c++ - 初始化静态成员类(用于信号处理程序)

java - Java 中的多线程不考虑套接字超时?

c++ - 控件到达 Void 的 Non Void 函数的结尾

c# - 当第一次运行被终止时,第二次运行时未捕获系统互斥体的 AbandonedMutexException

java - IDEA编译错误 Unknown version '8' ,不会给编译器任何版本设置

java - spring petcare 示例, Controller 操作如何链接到 jsp 的?

使用 Maven 构建的 jar 中缺少 Java 类