c# - 如何实现允许工作人员重新排队工作的有限并发工作队列

标签 c# multithreading concurrency

我希望实现以下内容:

  • 一个工作队列,我将在其中预先填充一些初始工作项
  • 完成后,我需要启动多个工作任务(或线程),这将使工作从此队列中出列
  • 在处理工作项时,每个工作人员都可以将项目重新排队到队列中
  • 所有工作线程必须保持运行并尝试将新工作项出队,直到 A) 当前没有剩余工作要做(队列为空),并且 B) 确定没有其他工作线程会将任何进一步的工作加入队列(又名,当前正在处理一个项目)

实现这一点的最简单方法是什么,最好使用 C# 中的主要标准数据结构?


我最初的方法是这样的:

var queue = new ConcurrentQueue<WorkItem>();
// ... populate queue with some initial items ...

await Task.WhenAll(Enumerable.Range(0, nWorkers).Select(_ => Task.Run(async () => {
  while(queue.TryDequeue(out var item))
  {
     // Process stuff asynchronously, potentially re-queue work, etc.
  }
}).ToArray());

但是,当然,这不起作用 - 一个工作人员可能会因为 TryDequeue 失败()而终止,但另一个仍然活跃的工作人员将重新排队工作。

channel 似乎也很合适,工作人员可以出队直到 channel 关闭。但根本问题依然存在:如何准确确定平仓条件?哪个工作人员根据什么条件关闭 channel ?

最佳答案

您需要 BlockingCollection<T> Channel<T> 作为队列,具体取决于您希望工作人员同步还是异步。您还需要 int变量来跟踪待处理的工作项。在队列中添加项目之前,必须递增此变量,并在工作项目完成时递减(在 finally 部分中)。递增和递减必须是原子的 ( Interlocked )。当变量减为零时,您知道所有工作都已完成,您可以调用 BlockingCollection<T>.CompleteAdding Channel<T>.Writer.Complete 。您可以在this answer中找到该技术的使用示例。 (ParallelTraverseHierarchical 方法)。

关于c# - 如何实现允许工作人员重新排队工作的有限并发工作队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76790481/

相关文章:

java - ReentrantReadWriteLock 多个读取线程

java - 确保线程被中断

c# - .NET 5.0 打开选择文件夹对话框

c# - 任务和线程有什么区别?

java - 使用同步静态方法时出现 IllegalMonitorStateException : object not locked by thread before wait(),

ios - 在 iOS 中使用 GCD 发生意外冲突

go - 限制运行的并发任务数

c# - Windows 通用项目不支持数据类型 "AnyName"(x :DataType for DataTemplate)

C# Object 将Attributes的值做成json对象

c# - 在生产应用程序中使用 SqlBulkInsert