我有这段代码可以处理列表中的项目:
static readonly object _Lock = new object();
public class Item
{
public string Name;
public string ID;
}
static void Main(string[] args)
{
var items = new List<Item>
{
new Item { Name = "One", ID = "123" },
new Item { Name = "Two", ID = "234" },
new Item { Name = "Three", ID = "123" }
};
var itemsProcess = new ConcurrentBag<Item>();
Parallel.ForEach(items, (item) =>
{
Item itemProcess = null;
// lock (_Lock)
{
itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
}
if (itemProcess != null)
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
itemsProcess.Add(item);
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
}
});
Console.ReadKey();
}
我主要是使用 ConcurrentBag
根据多种条件检查对象是否存在。
我期望总是得到这样的输出(顺序可能不同):
Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
但我有时会得到一个输出,这表明我的代码不是线程安全的:
Processing item [Three]
Processing item [One]
Processing item [Two]
所以我认为 itemsProcess.FirstOrDefault()
会阻塞的假设是错误的。
使用 lock
不会改变任何东西。显然,这里有问题,我真的不明白为什么?
我知道我可以用其他方式“解决”这个问题(一种是在输入 Parallel.ForEach()
之前准备列表),但我真的很想知道为什么 这是行为吗?
最佳答案
您的并行循环中有 2 个独立的操作:FirstOrDefault
和 Add
。
ConcurrentBag
无法确保这两个操作之间的线程安全。
另一种方法是 ConcurrentDictionary
,它有一个 GetOrAdd
方法,它只会在键不存在时添加一个项目:
var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
// Returns existing item with same ID or adds this item
var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
if (!object.ReferenceEquals(item, itemProcess))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
Console.WriteLine($"Processing item [{item.Name}]");
// do some work...
}
});
如果您随后需要将已处理的项目作为 ICollection
,可以通过 itemsProcess.Values
访问它们。
关于c# - 使用 ConcurrentBag 的并行 ForEach 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63699657/