c# - fork TPL 数据流管道时如何正确等待完成?

标签 c# tpl-dataflow

<分区>

我有一个 TPL 数据流管道,其中一个目标 block 链接到两个传播 block ,然后它们都链接到一个源 block 。所有都与 PropagateCompletion = true 链接。第一个传播 block 与仅接受偶数的过滤器链接,其中第二个传播 block 接受所有剩余消息。

发布最后一条消息后,我将第一个 block 设置为已完成。虽然似乎存在竞争条件。最终 block 有时似乎处理所有值,但有时只处理第一个传播 block 接受的值,而第二个传播 block 只接受部分值。

我觉得存在竞争条件。但是我不知道如何正确地指示最终源 block 一切都已完成,只有在链接到它的两个传播 block 都转发了它们的所有消息之后。

这是我的代码简化成一个简单的例子:

    internal static class Program
    {
        public static async Task Main(string[] args)
        {
            var linkOptions = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };
            var bufferBlock = new BufferBlock<int>();
            var fork1 = new TransformBlock<int, int>(n => n);
            var fork2 = new TransformBlock<int, int>(n =>
            {
                Thread.Sleep(100);
                return n;
            });
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock, linkOptions);
            fork2.LinkTo(printBlock, linkOptions);
            
            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }
            bufferBlock.Complete();

            await printBlock.Completion;
        }
    }

这个输出:

2
4
6
8
10

我希望它输出:

2
4
6
8
10
1
3
5
7
9

最佳答案

您的数据流图中有一个菱形,导致完成通过两个分支中的任何一个传播得更快,从而使最终 block 过早完成。

最后一个 block 的完成可以使用任务延续来定制:

          ...
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock); // no completion propagation
            fork2.LinkTo(printBlock);
           
            Task.WhenAll(fork1.Completion, fork2.Completion)
               .ContinueWith(t => printBlock.Complete(), 
                   CancellationToken.None, 
                   TaskContinuationOptions.ExecuteSynchronously, 
                   TaskScheduler.Default);

            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }

            bufferBlock.Complete();

            await printBlock.Completion;

关于c# - fork TPL 数据流管道时如何正确等待完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72977833/

相关文章:

c# - 当构造函数使用 1 个参数但 base 关键字使用 2 个参数时会发生什么

c# - 用于 C# 开发的 NoSQL FREE alternative (alternative to ravendb)

c# - 将整数转换为单词字符串

c# - 以编程方式触发 MouseLeftButtonDown 事件

c# - 用于 TPL 数据流的 BroadcastCopyBlock 保证交付

c# - TPL 数据流 TransformBlock 执行序列似乎无序/异步

task-parallel-library - TPL 数据流、MaxDegreeOfParallelism 与负载平衡

c# - 未选中匿名访问时无法访问该网站

c# - 链接数据流 block 完成不起作用

c# - TPL Dataflow SingleProducerConstrained 是指源 block 的数量还是它们的并行度?