我希望实现以下内容:
- 一个工作队列,我将在其中预先填充一些初始工作项
- 完成后,我需要启动多个工作任务(或线程),这将使工作从此队列中出列
- 在处理工作项时,每个工作人员都可以将项目重新排队到队列中
- 所有工作线程必须保持运行并尝试将新工作项出队,直到 A) 当前没有剩余工作要做(队列为空),并且 B) 确定没有其他工作线程会将任何进一步的工作加入队列(又名,当前正在处理一个项目)
实现这一点的最简单方法是什么,最好使用 C# 中的主要标准数据结构?
我最初的方法是这样的:
var queue = new ConcurrentQueue<WorkItem>();
// ... populate queue with some initial items ...
await Task.WhenAll(Enumerable.Range(0, nWorkers).Select(_ => Task.Run(async () => {
while(queue.TryDequeue(out var item))
{
// Process stuff asynchronously, potentially re-queue work, etc.
}
}).ToArray());
但是,当然,这不起作用 - 一个工作人员可能会因为 TryDequeue 失败()而终止,但另一个仍然活跃的工作人员将重新排队工作。
channel 似乎也很合适,工作人员可以出队直到 channel 关闭。但根本问题依然存在:如何准确确定平仓条件?哪个工作人员根据什么条件关闭 channel ?
最佳答案
您需要 BlockingCollection<T>
或 Channel<T>
作为队列,具体取决于您希望工作人员同步还是异步。您还需要 int
变量来跟踪待处理的工作项。在队列中添加项目之前,必须递增此变量,并在工作项目完成时递减(在 finally
部分中)。递增和递减必须是原子的 ( Interlocked
)。当变量减为零时,您知道所有工作都已完成,您可以调用 BlockingCollection<T>.CompleteAdding
或 Channel<T>.Writer.Complete
。您可以在this answer中找到该技术的使用示例。 (ParallelTraverseHierarchical
方法)。
关于c# - 如何实现允许工作人员重新排队工作的有限并发工作队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76790481/