"ConcurrentBag(T) is a thread-safe bag implementation, optimized for scenarios where the same thread will be both producing and consuming data stored in the bag." - MSDN
我有这个确切的用例(多个线程既消耗又生产),但我需要能够及时有效地确定袋子何时永久为空(我的线程仅根据消耗的内容进行生产,并且在线程启动之前,包会使用单个元素来启动)。
我很难找到一种无竞争条件且无需全局锁的有效方法来执行此操作。我相信引入全局锁会抵消使用几乎无锁的 ConcurrentBag 的好处。
我的实际用例是“无序”(二元)树遍历。我只需要访问每个节点,并对每个节点进行一些非常简单的计算。我不关心它们被访问的顺序。当所有节点都被访问过时,算法应该终止。
int taskCount = Environment.ProcessorCount;
Task[] tasks = new Task[taskCount];
var bag = new ConcurrentBag<TreeNode>();
bag.Add(root);
for (int i = 0; i < taskCount; i++)
{
int threadId = i;
tasks[threadId] = new Task(() =>
{
while(???) // Putting bag.IsEmpty>0 here would be obviously wrong as some other thread could have removed the last node but not yet added the node's "children"
{
TreeNode node;
bool success = bag.TryTake(out node);
if (!success) continue; //This spinning is probably not very clever here, but I don't really mind it.
// Placeholder: Do stuff with node
if (node.Left != null) bag.Add(node.Left);
if (node.Right != null) bag.Add(node.Right);
}
});
tasks[threadId].Start();
}
Task.WaitAll(tasks);
那么如何为此添加一个有效的终止条件?我不介意当袋子接近空时该条件变得昂贵。
最佳答案
我以前也遇到过这个问题。在检查队列之前,我将线程注册为处于等待状态。如果队列为空并且所有其他线程也在等待,我们就完成了。如果其他线程仍然忙碌,那么就来破解一下,休眠 10 毫秒。我相信可以通过使用某种同步(也许是 Barrier)来解决这个问题,而无需等待。
代码是这样的:
string Dequeue()
{
Interlocked.Increment(ref threadCountWaiting);
try
{
while (true)
{
string result = queue.TryDequeue();
if (result != null)
return result;
if (cancellationToken.IsCancellationRequested || threadCountWaiting == pendingThreadCount)
{
Interlocked.Decrement(ref pendingThreadCount);
return null;
}
Thread.Sleep(10);
}
}
finally
{
Interlocked.Decrement(ref threadCountWaiting);
}
}
也许可以用 Barrier
代替 sleep 和计数器维护。我只是没有打扰,这已经够复杂了。
互锁
操作是可扩展性瓶颈,因为它们基本上是使用硬件自旋锁实现的。因此,您可能想在方法的开头插入快速路径:
string result = queue.TryDequeue();
if (result != null)
return result;
大多数情况下都会采用快速路径。
关于c# - ConcurrentBag 的预期用途和空作为终止条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34110123/