我正在寻找一种“整洁”且有效的方法来实现长步骤 1(可以并行化)和步骤 2 的组合,步骤 2 需要按原始顺序(如果可能的话,尽量减少来自第一步的数据保存在 RAM 中),同时允许第二步在第一个对象的第 1 步的数据可用时立即开始,与第 2 步一起用于进一步的数据。
为了对此进行充实并使其更清晰,我需要压缩大量图像(缓慢 - 第 1 步),然后通过网络连接按顺序发送每张图像(第 2 步)。在任何阶段限制 RAM 中准备好的压缩数据 block 的数量也很重要,例如,如果发送 1000 张图像,我想将“完成”但未发送图像的数量限制为(比如)线程数/使用的处理器。
我做过一个“手写”的版本,使用了一个Task对象的数组,但是看起来很乱,我相信其他人一定有类似的需求,那么有没有更“标准”的方式这样做?理想情况下,我想要 Parallel.ForEach 的一个变体,有 2 个委托(delegate) - 一个用于第 1 步,一个用于第 2 步,我希望其中一个标准覆盖(例如包含“localFinal”参数的覆盖)可能有所帮助,但在事实证明,这些最后阶段是“每个线程”,而不是“每个委托(delegate)”。
谁能指出实现这一目标的现有巧妙方法?
最佳答案
您可以结合使用 Plinq(使用 WithDegreeOfParallelism()
来限制第一阶段的并发),以及用于完成 block 的 BlockingCollection。另请注意,它使用 AsOrdered()
来保留原始顺序。
下面举例说明。对于您的实际应用程序,您需要将此处显示的 int
工作项替换为您的工作项类型 - 文件名或包含与每个工作项相关的信息的类。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
int maxThreads = 4;
int maxOutputQueueSize = 10;
var workItems = Enumerable.Range(1, 100); // Pretend these are your files
var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
var worker = Task.Run(() => output(outputQueue));
var parallelWorkItems =
workItems
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(maxThreads)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(process);
foreach (var item in parallelWorkItems)
outputQueue.Add(item);
outputQueue.CompleteAdding();
worker.Wait();
Console.WriteLine("Done.");
}
static int process(int value) // Pretend that this compresses the data.
{
Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
Thread.Sleep(250); // Simulate slow operation.
return value; // Return updated work item.
}
static void output(BlockingCollection<int> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
Console.WriteLine($"Output is processing {item}");
Console.WriteLine("Finished outputting.");
}
}
}
请注意如何限制输入队列处理(通过 WithDegreeOfParallelism
)和输出队列的大小(使用 maxOutputQueueSize
参数)。
或者,如果您使用的是 .Net 4.5 或更高版本,您可以查看 TPL Dataflow library这对这种事情有很多支持。如果可以的话,我建议您使用它 - 但在此处的答案中描述它有点太多了。
关于c# - `Parallel.ForEach` 最后一步按定义的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34392004/