我正在尝试编写多线程代码。但说实话,我不明白从哪里开始。我的头也在撞击。请帮助我。
我的任务是,
- 有一个长度为 1 的队列,称为
pending_tasks
,其中包含需要进行某些处理的任务。 - 还有另一个长度为 1 的队列,称为
completed_tasks
,其中包含已完成处理并准备交付的任务。
我的实现思路,
- 首先创建两个阻塞队列,
pending_tasks
和completed_tasks
。 - 一个线程(生产者)始终监听来自外部的任务(如果任务被放入
pending_tasks
中)。 - 一个线程(消费者)始终准备好从
pending_tasks
中获取任务并开始处理,然后放入completed_tasks
中。 - 然后再次进入
pending_tasks
,每当有任务到来时,就开始相同的处理。 - 基本上,这是一个单一生产者-单一消费者问题。
我的困惑,
我知道可以使用ArrayBlockingQueue和Mutex来编码。但我不明白我该如何开始。我对互斥体有很好的理解,我从这个link中读到了关于互斥体的内容。 ,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。
能否请您给我一些实现指导,以便我可以编写这段多线程代码。
我已经为此编写了一些代码,但这并没有实现我任务的最终目标。
提前致谢。期待您的友好回复。
编辑编号1
请看我下面的代码。这段代码工作正常,但缺少一项功能。请帮我添加这一点,并提供一些指导。
功能是,
- 当生产者线程将一些值放入pending_task队列中时,它会在那里等待一段时间。如果那时消费者将结果提供给消费者,那么就可以了。否则,它会说超时,生产者会获取另一个值并将其放入pending_task队列中,然后相同的进程启动。
请帮助我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,并且线程通信是通过使用互斥体来完成的(我认为)。请帮助我实现相同的
我的代码,
多线程类
package multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MultiThread {
public static BlockingQueue<Integer> pending_task;
public static BlockingQueue<Integer> completed_task;
public MultiThread(int length) {
pending_task = new ArrayBlockingQueue<Integer>(length, true);
completed_task = new ArrayBlockingQueue<Integer>(length, true);
}
}
制作人等级
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("PRODUCER: Try to put value " + i + " in the pending queue");
MultiThread.pending_task.put(i);
System.out.println("PRODUCER: Successfully put value " + i + " in the pending queue, now its turn to consumer");
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
消费阶层
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("CONSUMER: Try to take value from the pending queue");
int val = MultiThread.pending_task.take();
System.out.println("CONSUMER: Successfully take value, and that is " + val);
System.out.println("CONSUMER: Processing starts");
Thread.sleep(1000);
System.out.println("CONSUMER: Processing ends");
System.out.println("CONSUMER: Try to put that that value in completed queue, and the value is " + val);
MultiThread.completed_task.put(val);
System.out.println("CONSUMER: Successfully put into completed queue");
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
DeliveryBoy 类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DeliveryBoy implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
int val = MultiThread.completed_task.take();
System.out.println("DELIVERYBOY: Succesfully take value from completed queue and the vlue is " + val);
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
测试类
package multithread;
public class Test {
public static void main(String[] args) {
// TODO code application logic here
MultiThread ml = new MultiThread(1);
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new DeliveryBoy()).start();
}
}
最佳答案
public void put(E e) throws InterruptedException
Inserts the specified element at the tail of this queue, waiting for **space to become available if the queue is full
public E take() throws InterruptedException
Description copied from interface: BlockingQueue Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
因此,您需要做的就是从线程中调用这些方法。
试试这个(研究 javadoc),当您遇到更具体的问题时,您可以再次询问。
关于java - 使用ArrayBlockingQueue和mutex的多线程代码中的疑问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16080736/