c# - 两分支数据流网络不完整

标签 c# task-parallel-library tpl-dataflow

该数据流网络有一个 fork ,并产生正确的文本输出和正确的结果。为什么没有完成?

            // Connect multiple blocks
            // source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
            //                                                        |-> multiply2 -> writeListOut
            var source = new BufferBlock<List<int>>();
            var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
            {
                return l.Select(_l => (double)_l).ToList();
            });
            source.LinkTo(convertToDouble);
            Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
            {
                return l.Select(_l => _l * 10.0).ToList();
            };
            var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            convertToDouble.LinkTo(multiply);
            var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
            {
                return l;
            });
            multiply.LinkTo(multiplyBuffer);
            var summation = new TransformBlock<List<double>, double>((List<double> l) =>
            {
                return l.Sum();
            });
            multiplyBuffer.LinkTo(summation);
            var writeOut = new ActionBlock<double>((double d) =>
            {
                Console.WriteLine("Writing out: " + d.ToString());
            });
            summation.LinkTo(writeOut);
            var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            multiplyBuffer.LinkTo(multiply2);
            var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
            {
                Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l => 
                    _l.ToString()).ToList()));
            });
            multiply2.LinkTo(writeListOut);

            source.Post(new List<int> { 1, 2, 3 });

            Task.Run(async () =>
            {
                await Task.Delay(3000);
                Console.WriteLine("posting 2nd...");
                source.Post(new List<int> { 4, 5, 6 });
                source.Complete();
            });

            // Never completes
            try
            {
                writeOut.Completion.Wait();
                writeListOut.Completion.Wait();
            }
            catch (AggregateException ex)
            {
                ex.Handle(e =>
                {
                    Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

我注意到如果省略 Completion.wait() 调用,程序就会返回。网络执行时没有观察到任何错误。

示例输出:

写出列表:100、200、300 写出:60 发帖第2... 写出:150 写出 list :400、500、600 (挂起)

预期输出:

写出列表:100、200、300 写出:60 发帖第2... 写出:150 写出 list :400、500、600 (返回)

最佳答案

在 TPL 中,默认情况下源完成不会传递到其他 block 。

您需要构造一个 System.Threading.Tasks.Dataflow.DataflowLinkOptions 并将其 PropagateCompletion 属性设置为 true,然后将其传递给进入您对 LinkTo 的调用。

或者,您可以按顺序调用所有 block 上的 Complete 方法。

关于c# - 两分支数据流网络不完整,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59217126/

相关文章:

c# - 将任务实例传递给任务的委托(delegate)

c# - TPL 数据流加速?

c# - 在 C# 中将页脚/EXIF(二进制数据)添加到 JPG 图像

c# - 查询表只返回null返回sqlite-net-pcl中的列表

c# - 从 IEnumerable<T> 中删除项目

c# - 多个短期 TPL 数据流与单个长期运行的数据流

c# - 我应该怎么做才能在 .NET 2.0 中使用 Task<T>?

c# - 如何跨越多个 TPL 数据流 block 的 MaxDegreeOfParallelism?

c# - .NET 中的并行抓取

c# - 将日期时间偏移应用于美国东部时区