我在应用程序内部有原始消息传递系统。生产者可以从一个线程提交消息,而消费者可以在另一个线程中处理消息-设计中只有线程两个线程:一个线程用于消费者,另一个线程用于生产者,并且无法更改此逻辑。
我正在使用ConcurrentLinkedQueue<>
实现来处理消息:
// producer's code (adds the request)
this.queue.add(req);
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
处理逻辑非常快,消费者每秒可能会收到大约X_000_000
请求。但是我发现使用探查器时
queue.poll()
有时非常慢(似乎队列从生产者处接收到许多新项目时)-与未填充新队列而已填满的队列相比,接收到许多新消息时,它的速度要慢大约10倍来自另一个线程的项目。有可能对其进行优化吗?对于这种特殊情况,最佳的
Queue<>
实现是什么(poll()
一个线程,add()
一个线程)?也许自己实现一些简单的队列会更容易些?
最佳答案
消费者在生产者生产时速度较慢,因为每次读取时都会遇到高速缓存未命中的情况,因为始终会出现一个新元素。
如果所有元素都已经存在,则可以将它们一起提取,从而提高了吞吐量。
当忙于等待时,请考虑使用 Thread.onSpinWait()
:尽管它增加了延迟,但它还启用了某些性能优化。
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
JDK没有针对SPSC(单生产者单用户)方案优化的队列。有图书馆。您可以使用Agrona或JCTools。实现这些并非易事。// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);
关于java - 仅有一个Java使用者和生产者线程的并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62995272/