c# - 循环枚举器的 PLINQ 迭代导致死锁

标签 c# .net parallel-processing task-parallel-library plinq

我有一个简单的程序,它迭代作为反馈枚举器实现的无限枚举。我已经在 TPL 和 PLINQ 中实现了这一点。这两个示例都会在可预测的迭代次数后锁定:PLINQ 为 8 次,TPL 为 3 次。如果代码在不使用 TPL/PLINQ 的情况下执行,则运行良好。我已经以非线程安全的方式和线程安全的方式实现了枚举器。如果并行度限制为 1(如示例中的情况),则可以使用前者。非线程安全枚举器非常简单,并且不依赖于任何“花哨的”.NET 库类。如果增加并行度,死锁之前执行的迭代次数就会增加,例如,对于 PLINQ,迭代次数为 8 * 并行度。

这里是迭代器:
枚举器(非线程安全)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private T _value;
    private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false);

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            _releaseValueEvent.WaitOne();
            yield return _value;
        }
    }

    public void OnNext(T value)
    {
        _value = value;
        _releaseValueEvent.Set();
    }
}

枚举器(线程安全)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>();

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            yield return _blockingCollection.Take();
        }
    }

    public void OnNext(T value)
    {
        _blockingCollection.Add(value);
    }
}

PLINQ 示例:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    enumerable
        .Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"))
        .AsParallel()
        .WithDegreeOfParallelism(1)
        .ForEach
        (
            i =>
            {
                Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
                enumerable.OnNext(i+1);
            }
        );
}

TPL 示例:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    Parallel.ForEach
    (
        enumerable,
        new ParallelOptions { MaxDegreeOfParallelism = 1},
        i =>
        {
            Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
            enumerable.OnNext(i+1);
        }
    );
}

根据我对调用堆栈的分析,PLINQ 和 TPL 中与分区器相关的方法似乎都发生了死锁,但我不确定如何解释这一点。

通过反复试验,我发现将 PLINQ enumerable 包装在 Partitioner.Create(enumerable, EnumerablePartitionerOptions.NoBuffering) 中可以解决问题,但我不确定为什么发生死锁。

我非常有兴趣找出该错误的根本原因。

请注意,这是一个人为的示例。我不是在寻找对代码的批评,而是寻找为什么发生死锁。具体来说,在 PLINQ 示例中,如果注释掉 .AsParallel().WithDegreeOfParallelism(1) 行,则代码可以正常工作。

最佳答案

您实际上没有值的逻辑序列,因此尝试创建 IEnumerable首先根本没有任何意义。此外,您几乎肯定不应该尝试创建 IEnumerator可以被多个线程使用。存在疯狂,仅仅因为 IEnumerator 的界面暴露并没有真正暴露你想要的东西。您可以创建一个 IEnumerator它只会被单个线程使用,该线程根据多个线程使用的底层数据源计算要返回的数据,因为这是相当不同的。

如果您只是想创建在不同线程中运行的生产者和消费者,请不要在 BlockingCollection 周围创建自己的“包装器” ,*只需使用 BlockingCollection 。让生产者添加它,让消费者读取它。消费者可以使用GetConsumingEnumerable如果它只想在获取这些项目时迭代这些项目(想要执行的常见操作)。

关于c# - 循环枚举器的 PLINQ 迭代导致死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40873020/

相关文章:

c# - 如何在 xaml 中访问 Collection 中的基本数据上下文

.net - 为什么 OnXXX 事件引发器方法不返回值?

matlab - Simulink-Simulation with parfor(并行计算)

.net-4.0 - 如何并行处理 MSMQ 消息

c# - Collection.Add() 的工作流程

c# - 使用 async/await 发送电子邮件并删除资源检查条件

c# - .NET 泛型 : Using an Activator created type as a generic shows wrong Type? 需要解决方法

c# - Parallel.For() 和 Windows 消息循环

c# - 在 C# 中截取特定区域的屏幕截图?

c# - 在 C# 中从 PictureBox 中删除图像