c# - 如何使用 TPL 数据流库指定无序执行 block ?

标签 c# task-parallel-library tpl-dataflow

我想设置一个 TransformBlock 来并行处理它的项目。因此,我将 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 设置为 > 1。我不关心消息的顺序,而是关心 documentation。说:

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

“正确排序”是否意味着如果队列中有一条消息需要较长的处理时间,则在处理完这条消息之前不会输出更多消息?

如果是这样,我如何指定一个不关心顺序的执行 block (例如 TransformBlock)?还是我必须在消费端指定我不关心订购?

最佳答案

库中没有这样的 block ,但您可以通过组合 ActionBlockBufferBlock 轻松自己创建一个。像这样的东西:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

这样,一旦 ActionBlock 处理了一个项目,它就会立即移动到 BufferBlock,这意味着不会保持顺序。

此代码的一个问题是它没有很好地观察集合 BoundedCapacity:实际上,此 block 的容量是选项中设置的容量的两倍(因为两个 block 中的每一个都有一个单独的容量)。

关于c# - 如何使用 TPL 数据流库指定无序执行 block ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22893908/

相关文章:

c# - 我在哪里可以找到异步队列的当前内容?

c# - 如何跨越多个 TPL 数据流 block 的 MaxDegreeOfParallelism?

c# - TPL 数据流,Post() 和 SendAsync() 之间的功能区别是什么?

c# - SAP B1 AddOn,检查表是否存在和创建表的最佳位置是什么?

c# - 为什么当我尝试访问我的任务的结果属性时此异步操作挂起?

c# - 在不同形式的 dataGridView 中添加列和行

.net - TPL 取消延续从未调用已取消的任务

c# - TPL 数据流 - 随时控制流中的项目

c# - 工作单元、回滚选项

c# - UWP - 如何在后台任务中使用 WriteableBimap?选择?