我有一个系统的一部分,它在工作线程中处理输入项的 BlockingQueue
,并将结果放在输出项的 BlockingQueue
上,其中相关代码(简化)看起来像这样:
while (running()) {
InputObject a=inputQueue.take(); // Get from input BlockingQueue
OutputObject b=doProcessing(a); // Process the item
outputQueue.put(b); // Place on output BlockingQueue
}
doProcessing
是此代码中的主要性能瓶颈,但队列项的处理可以并行,因为处理步骤都是相互独立的。
因此,我想改进这一点,以便项目可以由多个线程同时处理,但有一个限制,即不得更改输出的顺序(例如,我不能简单地拥有 10 个线程)运行上面的循环,因为这可能会导致输出的顺序根据处理时间而不同)。
用纯粹、惯用的 Java 实现这一目标的最佳方法是什么?
最佳答案
来自 List
的并行流保留顺序:
List<T> input = ...
List<T> output = input.parallelStream()
.filter(this::running)
.map(this::doProcessing)
.collect(Collectors.toList());
PriorityBlockingQueue
如果您的工作项可以相互比较,则可以使用,并且您将等到 running()
为 false
后再从输出队列读取:
outputQueue = new PriorityBlockingQueue<>();
或者您可以在全部处理完毕后订购它们(如果它们可以相互比较):
outputQueue.drainTo(outputList);
outputList.sort(null);
实现比较的一个简单方法是为放入输入队列的每个元素分配一个渐进 ID。
关于java - 从 Java BlockingQueue 并发按顺序处理工作项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66021883/