我有多个任务队列和一个消费者
线程。
Consumer
线程应该在任何队列有任务时唤醒。所以我想知道进行这种交流的最佳方式是什么。
这些是可以解决这个问题的一些实现,并解释了为什么我想找到不同的东西:
- 实现它的一种方法是使用一些监视器对象并调用
.wait(timeout)
从消费者线程调用.notify()
来自生产者线程。然而,这种方法使用wait/notify
这 是一种低级 api,所以我尽可能避免使用它。而且它并不总是正确的,在某些情况下,我们最终可能会等待整个超时,而我们有任务要做( sleep 的理发师问题)。 - 像
CountDownLatch
和reset
方法会很好, 但我没有在java.util.concurrent
中找到类似的东西。实现会相当简单,但是实现新自行车是我比wait/notify
更想避免的事情。此外,我认为等待整个超时与wait/notify
方法存在相同的问题。 - 让生产者将创建的实体包装到一些
Task
中,并让所有生产者写入同一个队列,这样消费者就可以监听一个队列。我相信这种方法在大多数情况下实际上非常好,但在我的情况下,这部分应用程序具有低延迟要求,因此我必须避免创建新对象(例如这些包装器),而且它会增加队列尾部的争用(而不是一个消费者,他们都会在那里写)这对延迟也不是很好。
那么还有其他的实现方式吗?(可能是使用了一些其他的并发原语)
最佳答案
如何使用任务通知队列,在这种情况下,如果任何任务队列添加项目,它也会将项目添加到通知队列。
以下片段说明了这种方法:
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
public class Main<T1, T2> {
Queue<T1> taskType1Queue = new ArrayBlockingQueue<T1>(10);
Queue<T2> taskType2Queue= new ArrayBlockingQueue<T2>(10);
ArrayBlockingQueue<Boolean> notificationQueue= new ArrayBlockingQueue<Boolean>(2);
public void produceType1(T1 task) {
new Thread(new Runnable() {
@Override
public void run() {
taskType1Queue.add(task);
notificationQueue.offer(true);; //does not block if full
}
});
}
public void produceType2(T2 task) {
new Thread(new Runnable() {
@Override
public void run() {
taskType2Queue.add(task);
notificationQueue.offer(true); //does not block if full
}
});
}
public void consume() {
try {
notificationQueue.take();//wait till task1 o task2 has been published
for(;!Thread.currentThread().isInterrupted();){
T1 task1 = taskType1Queue.poll();//does not block if queue is empty
if (task1 != null) {
//do something
}
T2 task2 = taskType2Queue.poll();//does not block if queue is empty
if (task2 != null) {
//do something
}
if(task1 == null && task2 == null) {
break;
}
}
} catch (InterruptedException e) {
System.out.println("Consumer thread done");
return;
}
}
}
关于java - 消费者线程等待多个队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50258103/