c# - ParallelQuery.Aggregate 不并行运行的可能原因

标签 c# .net multithreading parallel-processing plinq

非常感谢 PLYNQ 专家的任何帮助!我会花时间查看答案,我在 math.SE 上有更成熟的个人资料。

我有一个 ParallelQuery<List<string>> 类型的对象,其中有 44 个我想并行处理的列表(比如一次五个)。 我的流程有一个签名,如

private ProcessResult Process(List<string> input)

处理后会返回一个结果,是一对 bool 值,如下所示。

    private struct ProcessResult
    {
        public ProcessResult(bool initialised, bool successful)
        {
            ProcessInitialised = initialised;
            ProcessSuccessful = successful;
        }

        public bool ProcessInitialised { get; }
        public bool ProcessSuccessful { get; }
    }

问题给定一个 IEnumerable<List<string>> processMe ,我的 PLYNQ 查询尝试实现此方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx .写成

processMe.AsParallel()
         .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
             (
                 new ConcurrentStack<ProcessResult>,   //aggregator seed
                 (agg, input) =>
                 {                         //updating the aggregate result
                     var res = Process(input);
                     agg.Push(res);
                     return agg;
                 },
                 agg => 
                 {                         //obtain the result from the aggregator agg
                     ProcessResult res;    // (in this case just the most recent result**)
                     agg.TryPop(out res);
                     return res;
                 }
             );

不幸的是,它不是并行运行的,而是顺序运行的。 (** 请注意,此实现没有“意义”,我现在只是想让并行化工作。)


我尝试了一个略有不同的实现,它确实并行运行,但没有聚合。我定义了一个聚合方法(本质上是 ProcessResult 的两个部分上的 bool AND,即聚合([A1, A2], [B1, B2])≡[A1 && B1, A2 && B2])。

private static ProcessResult AggregateProcessResults
        (ProcessResult aggregate, ProcessResult latest)
    {
        bool ini = false, suc = false;
        if (aggregate.ProcessInitialised && latest.ProcessInitialised)
            ini = true;
        if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
            suc = true;


        return new ProcessResult(ini, suc);
    }

并使用 PLYNQ 查询 https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true),
    (res, input)  => Process(input),
    (agg, latest) => AggregateProcessResults(agg, latest),
    agg           => agg

这里的问题是 AggregateProcessResults由于某种原因,代码从未被命中——我对结果的去向一无所知……

感谢阅读,感谢任何帮助:)

最佳答案

您使用的 Aggregate 的重载确实不会按照设计并行运行。您传递种子,然后是步进函数,但步进函数 (agg) 的参数是从上一个 步骤接收到的累加器。出于这个原因,它本质上是顺序的(上一步的结果输入到下一步)并且不可并行化。不确定为什么此重载包含在 ParallelEnumerable 中,但可能是有原因的。

相反,使用另一个重载:

var result = processMe
.AsParallel()
.Aggregate
(
    // seed factory. Each partition will call this to get its own seed
    () => new ConcurrentStack<ProcessResult>(),
    // process element and update accumulator
    (agg, input) =>
    {                                           
        var res = Process(input);
        agg.Push(res);
        return agg;
    },
    // combine accumulators from different partitions
    (agg1, agg2) => {
        agg1.PushRange(agg2.ToArray());
        return agg1;
    },
    // reduce
    agg =>
    {
        ProcessResult res;
        agg.TryPop(out res);
        return res;
    }
);

关于c# - ParallelQuery.Aggregate 不并行运行的可能原因,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47420124/

相关文章:

.NET:User.IsInRole 无法在 Visual Studio 中工作

java - 主线程卡住所有其他线程,包括 java gui 线程

java - 是否建议将此进程拆分为不同的线程?

c# - 并行运行异步方法 8 次

java - Java 中的慢速多线程 - Air Percussion 项目

c# - 异步操作中的异步操作

C# 一次将字符串行合并为 3 组

c# - 如何证明释放了弱引用?

c# - 如何处理 DateTime 的 JSON 数据成员中的空值

c# - c# 中 list<string> 中数据的最大限制是多少?