c# - 使用 ConcurrentBag 的并行 ForEach 未按预期工作

标签 c# concurrency parallel-processing parallel.foreach

我有这段代码可以处理列表中的项目:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

我主要是使用 ConcurrentBag 根据多种条件检查对象是否存在。
期望总是得到这样的输出(顺序可能不同):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

但我有时会得到一个输出,这表明我的代码不是线程安全的:

Processing item [Three]
Processing item [One]
Processing item [Two]

所以我认为 itemsProcess.FirstOrDefault() 会阻塞的假设是错误的。
使用 lock 不会改变任何东西。显然,这里有问题,我真的不明白为什么?

我知道我可以用其他方式“解决”这个问题(一种是在输入 Parallel.ForEach() 之前准备列表),但我真的很想知道为什么 这是行为吗?

最佳答案

您的并行循环中有 2 个独立的操作:FirstOrDefaultAdd

ConcurrentBag 无法确保这两个操作之间的线程安全。

另一种方法是 ConcurrentDictionary,它有一个 GetOrAdd 方法,它只会在键不存在时添加一个项目:

var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
    if (!object.ReferenceEquals(item, itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});

如果您随后需要将已处理的项目作为 ICollection,可以通过 itemsProcess.Values 访问它们。

关于c# - 使用 ConcurrentBag 的并行 ForEach 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63699657/

相关文章:

c# - 方法放在代码中的整体性能

java - JCS 并发错误

java - 具有过期可能性的简单 Java 字符串缓存

c# - 具有多个参数的 Task.Factory.StartNew

c# - 使用 emguCV 3.1.0 在图像集合中查找匹配图像

c# - MVC 中的 Razor 页面出现编译错误,未找到 System.Web.Helpers

c# - 与 Assembly Load(byte[] rawAssembly) 相反的操作

java - Hibernate 在异步 EJB 应用程序中的奇怪行为。竞赛条件?

python - 在python中同时并行化不同的功能

c++ - 在多线程环境中使用 MPI_THREAD_SERIALIZED 时,所有 MPI 调用都需要锁吗?