c# - `Parallel.ForEach` 最后一步按定义的顺序

标签 c# parallel-processing parallel.foreach

我正在寻找一种“整洁”且有效的方法来实现长步骤 1(可以并行化)和步骤 2 的组合,步骤 2 需要按原始顺序(如果可能的话,尽量减少来自第一步的数据保存在 RAM 中),同时允许第二步在第一个对象的第 1 步的数据可用时立即开始,与第 2 步一起用于进一步的数据。

为了对此进行充实并使其更清晰,我需要压缩大量图像(缓慢 - 第 1 步),然后通过网络连接按顺序发送每张图像(第 2 步)。在任何阶段限制 RAM 中准备好的压缩数据 block 的数量也很重要,例如,如果发送 1000 张图像,我想将“完成”但未发送图像的数量限制为(比如)线程数/使用的处理器。

我做过一个“手写”的版本,使用了一个Task对象的数组,但是看起来很乱,我相信其他人一定有类似的需求,那么有没有更“标准”的方式这样做?理想情况下,我想要 Parallel.ForEach 的一个变体,有 2 个委托(delegate) - 一个用于第 1 步,一个用于第 2 步,我希望其中一个标准覆盖(例如包含“localFinal”参数的覆盖)可能有所帮助,但在事实证明,这些最后阶段是“每个线程”,而不是“每个委托(delegate)”。

谁能指出实现这一目标的现有巧妙方法?

最佳答案

您可以结合使用 Plinq(使用 WithDegreeOfParallelism() 来限制第一阶段的并发),以及用于完成 block 的 BlockingCollection。另请注意,它使用 AsOrdered() 来保留原始顺序。

下面举例说明。对于您的实际应用程序,您需要将此处显示的 int 工作项替换为您的工作项类型 - 文件名或包含与每个工作项相关的信息的类。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            int maxThreads = 4;
            int maxOutputQueueSize = 10;
            var workItems = Enumerable.Range(1, 100); // Pretend these are your files
            var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
            var worker = Task.Run(() => output(outputQueue));

            var parallelWorkItems = 
                workItems
                .AsParallel()
                .AsOrdered()
                .WithDegreeOfParallelism(maxThreads)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(process);

            foreach (var item in parallelWorkItems)
                outputQueue.Add(item);

            outputQueue.CompleteAdding();
            worker.Wait();

            Console.WriteLine("Done.");
        }

        static int process(int value) // Pretend that this compresses the data.
        {
            Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
            Thread.Sleep(250);  // Simulate slow operation.
            return value; // Return updated work item.
        }

        static void output(BlockingCollection<int> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
                Console.WriteLine($"Output is processing {item}");

            Console.WriteLine("Finished outputting.");
        }
    }
}

请注意如何限制输入队列处理(通过 WithDegreeOfParallelism)和输出队列的大小(使用 maxOutputQueueSize 参数)。

或者,如果您使用的是 .Net 4.5 或更高版本,您可以查看 TPL Dataflow library这对这种事情有很多支持。如果可以的话,我建议您使用它 - 但在此处的答案中描述它有点太多了。

关于c# - `Parallel.ForEach` 最后一步按定义的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34392004/

相关文章:

c# - 继承字典的序列化类不是序列化属性

parallel-processing - CUDA 中的忙碌旋转

c# - Parallel.Foreach + yield 返回?

python - 在Python中使用Ray并行化任务,得到 "Aborted (core dumped)"

c# - Parallel.For(Foreach) 将创建多少个线程?默认 MaxDegreeOfParallelism?

c# - 并行执行查询会导致更多时间 [SQL Server]

c# - 为什么Lazy <T>不懒惰?

c# - 将60m记录导入SQL的最快方法是什么

c# - 在 systemd 文件中使用 Environment= 解析 ASP.NET Core 连接字符串

c++ - 并行 vector 调整大小不加速