.net - Parallel.ForEach 具有有序输入?

标签 .net parallel-processing task-parallel-library parallel.foreach

我的程序中使用了一个 Parallel.ForEach() 语句。它使用一些对象的列表作为输入。我不关心输出顺序,但我需要这个循环以与输入列表中相同的顺序获取输入元素。是否可以使用 Parallel.ForEach() 来实现此目的?

最佳答案

如果您需要保留 IEnumerable<T> 的订单,您可能想要 implement a custom partitioner OrderablePartitioner<T> 的种类。此类的示例代码包括一个简单的示例,该示例以递增的顺序从枚举中一次检索它们。

但是,对于像 ConcurrentQueue<T> 这样的简单生产者-消费者模型来说,这需要大量工作。 :

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

使用此代码,您将保证先进先出的行为,并且您的请求最终几乎按照收到的顺序“即时”处理。一旦并行放置,您将无法保证它们保持顺序。

或者,您可以使用以下方法(限制队列项目数保持固定):

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

如果您想继续在一个线程中生产,并在其他线程中消费,请使用 BlockingCollection<T> 以队列作为其后备集合:

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

现在我们需要“无限制”消费者,因为我们不确定可能存在多少队列项:

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

关于.net - Parallel.ForEach 具有有序输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18718112/

相关文章:

.net - 如果检测到相同版本,则升级 Bootstrap 包

c - 在乒乓测试中使用 pthread 条件变量

c# - 从 Bitmap 类的颜色填充 160x43 字节数组的更快方法

c# - 在 TPL 上执行长时间任务时图像渲染速度缓慢

c# - 在 Windows 服务中使用 ConfigureAwait(false)?

c# - 知道 DLL 在什么上下文中运行

c# - LINQ to Entities 仅支持使用 IEntity 接口(interface)转换 EDM 原语或枚举类型

c# - 有谁知道哪里有可以生成示例信用卡号的 c# 代码或 dll

c# - 在单独的线程上将数据绑定(bind)到网格控件

c# - 为什么 LogicalCallContext 不适用于异步?