我的程序中使用了一个 Parallel.ForEach()
语句。它使用一些对象的列表作为输入。我不关心输出顺序,但我需要这个循环以与输入列表中相同的顺序获取输入元素。是否可以使用 Parallel.ForEach() 来实现此目的?
最佳答案
如果您需要保留 IEnumerable<T>
的订单,您可能想要 implement a custom partitioner OrderablePartitioner<T>
的种类。此类的示例代码包括一个简单的示例,该示例以递增的顺序从枚举中一次检索它们。
但是,对于像 ConcurrentQueue<T>
这样的简单生产者-消费者模型来说,这需要大量工作。 :
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
X x;
while (queue.TryDequeue(out x))
{
x.Frob();
}
};
// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
使用此代码,您将保证先进先出的行为,并且您的请求最终几乎按照收到的顺序“即时”处理。一旦并行放置,您将无法保证它们保持顺序。
或者,您可以使用以下方法(限制队列项目数保持固定):
// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
queue,
_ =>
{
X x;
if (queue.TryDequeue(out x))
{
x.Frob();
}
});
如果您想继续在一个线程中生产,并在其他线程中消费,请使用 BlockingCollection<T>
以队列作为其后备集合:
var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());
// add to it
Task.Factory.StartNew( () =>
{
foreach (var x in yourEnumerableOfX)
{
queue.Add(x);
Thread.Sleep(200);
}
// Signal to our consumers we're done:
queue.CompleteAdding();
});
现在我们需要“无限制”消费者,因为我们不确定可能存在多少队列项:
// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
while (!queue.IsCompleted)
{
X x;
try
{
// blocking form, switch to TryTake and maybe Thread.Sleep()
x = queue.Take();
}
catch (InvalidOperationException)
{
// none left
break;
}
x.Frob();
}
};
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
关于.net - Parallel.ForEach 具有有序输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18718112/