java - 仅有一个Java使用者和生产者线程的并发队列

标签 java multithreading algorithm concurrency queue

我在应用程序内部有原始消息传递系统。生产者可以从一个线程提交消息,而消费者可以在另一个线程中处理消息-设计中只有线程两个线程:一个线程用于消费者,另一个线程用于生产者,并且无法更改此逻辑。
我正在使用ConcurrentLinkedQueue<>实现来处理消息:

// producer's code (adds the request)
this.queue.add(req);

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}
处理逻辑非常快,消费者每秒可能会收到大约X_000_000请求。
但是我发现使用探查器时queue.poll()有时非常慢(似乎队列从生产者处接收到许多新项目时)-与未填充新队列而已填满的队列相比,接收到许多新消息时,它的速度要慢大约10倍来自另一个线程的项目。
有可能对其进行优化吗?对于这种特殊情况,最佳的Queue<>实现是什么(poll()一个线程,add()一个线程)?也许自己实现一些简单的队列会更容易些?

最佳答案

消费者在生产者生产时速度较慢,因为每次读取时都会遇到高速缓存未命中的情况,因为始终会出现一个新元素。
如果所有元素都已经存在,则可以将它们一起提取,从而提高了吞吐量。
当忙于等待时,请考虑使用 Thread.onSpinWait() :尽管它增加了延迟,但它还启用了某些性能优化。

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    Thread.onSpinWait();
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}
JDK没有针对SPSC(单生产者单用户)方案优化的队列。有图书馆。您可以使用AgronaJCTools。实现这些并非易事。
// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);

关于java - 仅有一个Java使用者和生产者线程的并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62995272/

相关文章:

Java - 比较

algorithm - K-d树: nearest neighbor search algorithm with tractable pseudo code

algorithm - 更改随机选择算法对运行时的影响

java - onApplicationEnd - CF 实际上正在关闭吗?

java - 理解 concurrentHashMap

以最平衡的方式分解字符串数组的算法建议

java - 如何将 JTextField 转换为 int 并再次返回?

java - 有谁知道枚举接口(interface)的JNI签名类型吗?

java - JPA:哪一方应该是 m:n 关系中的拥有方?

c# - 线程安全的内存