java - 从 Java BlockingQueue 并发按顺序处理工作项

标签 java performance concurrency queue

我有一个系统的一部分,它在工作线程中处理输入项的 BlockingQueue ,并将结果放在输出项的 BlockingQueue 上,其中相关代码(简化)看起来像这样:

while (running()) {
   InputObject a=inputQueue.take();   // Get from input BlockingQueue
   OutputObject b=doProcessing(a);    // Process the item 
   outputQueue.put(b);                // Place on output BlockingQueue
}

doProcessing 是此代码中的主要性能瓶颈,但队列项的处理可以并行,因为处理步骤都是相互独立的。

因此,我想改进这一点,以便项目可以由多个线程同时处理,但有一个限制,即不得更改输出的顺序(例如,我不能简单地拥有 10 个线程)运行上面的循环,因为这可能会导致输出的顺序根据处理时间而不同)。

用纯粹、惯用的 Java 实现这一目标的最佳方法是什么?

最佳答案

来自 List 的并行流保留顺序:

List<T> input = ...
List<T> output = input.parallelStream()
                .filter(this::running)
                .map(this::doProcessing)
                .collect(Collectors.toList());

PriorityBlockingQueue如果您的工作项可以相互比较,则可以使用,并且您将等到 running()false 后再从输出队列读取:

outputQueue = new PriorityBlockingQueue<>();

或者您可以在全部处理完毕后订购它们(如果它们可以相互比较):

outputQueue.drainTo(outputList);
outputList.sort(null);

实现比较的一个简单方法是为放入输入队列的每个元素分配一个渐进 ID。

关于java - 从 Java BlockingQueue 并发按顺序处理工作项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66021883/

相关文章:

java - StyledLabel 上的文本被截断

javascript - 将 div 附加到正文

java - 哪里需要同步?

c# - "data.Length==0"或 "data==string.Empty"哪个更有效?

linux - 如何在 Linux 上同步多个独立应用程序的文件 I/O?

c++ - 使用shared_ptr实现RCU(读取-复制-更新)?

java - 如何判断程序何时终止?

java - 如何快速获取图像矩阵? java

JAVA 编程复选框到 MySQL

performance - PostgreSQL - BEFORE 触发器是否比 AFTER 触发器更有效?