c# - ConcurrentBag 的预期用途和空作为终止条件

标签 c# multithreading concurrency

"ConcurrentBag(T) is a thread-safe bag implementation, optimized for scenarios where the same thread will be both producing and consuming data stored in the bag." - MSDN

我有这个确切的用例(多个线程既消耗又生产),但我需要能够及时有效地确定袋子何时永久为空(我的线程仅根据消耗的内容进行生产,并且在线程启动之前,包会使用单个元素来启动)。

我很难找到一种无竞争条件且无需全局锁的有效方法来执行此操作。我相信引入全局锁会抵消使用几乎无锁的 ConcurrentBag 的好处。

我的实际用例是“无序”(二元)树遍历。我只需要访问每个节点,并对每个节点进行一些非常简单的计算。我不关心它们被访问的顺序。当所有节点都被访问过时,算法应该终止。

int taskCount = Environment.ProcessorCount;
Task[] tasks = new Task[taskCount];
var bag = new ConcurrentBag<TreeNode>();
bag.Add(root);
for (int i = 0; i < taskCount; i++)
{
    int threadId = i;
    tasks[threadId] = new Task(() =>
    {
        while(???) // Putting bag.IsEmpty>0 here would be obviously wrong as some other thread could have removed the last node but not yet added the node's "children"
        {
            TreeNode node;
            bool success = bag.TryTake(out node);

            if (!success) continue; //This spinning is probably not very clever here, but I don't really mind it.

            // Placeholder: Do stuff with node

            if (node.Left != null) bag.Add(node.Left);
            if (node.Right != null) bag.Add(node.Right);
        }
    });
    tasks[threadId].Start();
}
Task.WaitAll(tasks);

那么如何为此添加一个有效的终止条件?我不介意当袋子接近空时该条件变得昂贵。

最佳答案

我以前也遇到过这个问题。在检查队列之前,我将线程注册为处于等待状态。如果队列为空并且所有其他线程也在等待,我们就完成了。如果其他线程仍然忙碌,那么就来破解一下,休眠 10 毫秒。我相信可以通过使用某种同步(也许是 Barrier)来解决这个问题,而无需等待。

代码是这样的:

string Dequeue()
{
    Interlocked.Increment(ref threadCountWaiting);
    try
    {
        while (true)
        {
            string result = queue.TryDequeue();
            if (result != null)
                return result;

            if (cancellationToken.IsCancellationRequested || threadCountWaiting == pendingThreadCount)
            {
                Interlocked.Decrement(ref pendingThreadCount);
                return null;
            }

            Thread.Sleep(10);
        }
    }
    finally
    {
        Interlocked.Decrement(ref threadCountWaiting);
    }
}

也许可以用 Barrier 代替 sleep 和计数器维护。我只是没有打扰,这已经够复杂了。

互锁操作是可扩展性瓶颈,因为它们基本上是使用硬件自旋锁实现的。因此,您可能想在方法的开头插入快速路径:

            string result = queue.TryDequeue();
            if (result != null)
                return result;

大多数情况下都会采用快速路径。

关于c# - ConcurrentBag 的预期用途和空作为终止条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34110123/

相关文章:

c# - LINQ 与跳过和采取

c# - 微服务架构中的 ASP.NET 标识

java - Android 线程、锁、并发示例

c# - 如何将 SAML XML token 字符串转换为 SecurityToken 或 ClaimsPrincipal 实例?

ruby - 自定义线程?

c - 我如何在 Windows 内核驱动程序中正确实现线程?

php - 如何在 PHP 中使用并发访问 mysql 表

go - 如何从字符串 traceid 创建 opentelemetry 跨度

memory-management - 分代垃圾回收和增量垃圾回收有什么区别?

C# hashtable 添加我自己创建的类