java - 消费者线程等待多个队列

标签 java multithreading producer-consumer

我有多个任务队列和一个消费者线程。

Consumer 线程应该在任何队列有任务时唤醒。所以我想知道进行这种交流的最佳方式是什么。

这些是可以解决这个问题的一些实现,并解释了为什么我想找到不同的东西:

  • 实现它的一种方法是使用一些监视器对象并调用 .wait(timeout) 从消费者线程调用 .notify() 来自生产者线程。然而,这种方法使用 wait/notify 这 是一种低级 api,所以我尽可能避免使用它。而且它并不总是正确的,在某些情况下,我们最终可能会等待整个超时,而我们有任务要做( sleep 的理发师问题)。
  • CountDownLatchreset 方法会很好, 但我没有在 java.util.concurrent 中找到类似的东西。实现会相当简单,但是实现新自行车是我比 wait/notify 更想避免的事情。此外,我认为等待整个超时与 wait/notify 方法存在相同的问题。
  • 让生产者将创建的实体包装到一些Task中,并让所有生产者写入同一个队列,这样消费者就可以监听一个队列。我相信这种方法在大多数情况下实际上非常好,但在我的情况下,这部分应用程序具有低延迟要求,因此我必须避免创建新对象(例如这些包装器),而且它会增加队列尾部的争用(而不是一个消费者,他们都会在那里写)这对延迟也不是很好。

那么还有其他的实现方式吗?(可能是使用了一些其他的并发原语)

最佳答案

如何使用任务通知队列,在这种情况下,如果任何任务队列添加项目,它也会将项目添加到通知队列。

以下片段说明了这种方法:

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

public class Main<T1, T2>  {


  Queue<T1> taskType1Queue = new ArrayBlockingQueue<T1>(10);
  Queue<T2> taskType2Queue= new ArrayBlockingQueue<T2>(10);
  ArrayBlockingQueue<Boolean> notificationQueue= new ArrayBlockingQueue<Boolean>(2);


  public void produceType1(T1 task) {
    new Thread(new Runnable() {
      @Override
      public void run() {
        taskType1Queue.add(task);
        notificationQueue.offer(true);; //does not block if full
      }
    });
  }

  public void produceType2(T2 task) {
    new Thread(new Runnable() {
      @Override
      public void run() {
        taskType2Queue.add(task);
        notificationQueue.offer(true); //does not block if full
      }
    });
  }


  public void consume() {

    try {
      notificationQueue.take();//wait till task1 o task2 has been published

      for(;!Thread.currentThread().isInterrupted();){
        T1 task1 = taskType1Queue.poll();//does not block if queue is empty
        if (task1 != null) {
          //do something
        }
        T2 task2 = taskType2Queue.poll();//does not block if queue is empty
        if (task2 != null) {
          //do something
        }
        if(task1 == null && task2 == null) {
          break;
        }
      }

    } catch (InterruptedException e) {
      System.out.println("Consumer thread done");
      return;
    }

  }

}

关于java - 消费者线程等待多个队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50258103/

相关文章:

java - 如何设置任务不定期执行?

java - 在dropwizard项目中使用ehcache

java - 线程内线程的优先级

go - golang 中的生产者消费者 - 并发与并行?

java - 如何对重复的对象列表进行排序

java - 在简单的 2d 游戏中实现重力

c# - C#/VS2013-为什么这些代码在没有调试的情况下会失败?

java - Unirest 线程泄漏

java - 为什么我的 Disruptor 程序没有充分利用环形缓冲区

c - 在 C 中线程化 : Producer Consumer taking forever to run