该数据流网络有一个 fork ,并产生正确的文本输出和正确的结果。为什么没有完成?
// Connect multiple blocks
// source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
// |-> multiply2 -> writeListOut
var source = new BufferBlock<List<int>>();
var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
{
return l.Select(_l => (double)_l).ToList();
});
source.LinkTo(convertToDouble);
Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
{
return l.Select(_l => _l * 10.0).ToList();
};
var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
convertToDouble.LinkTo(multiply);
var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
{
return l;
});
multiply.LinkTo(multiplyBuffer);
var summation = new TransformBlock<List<double>, double>((List<double> l) =>
{
return l.Sum();
});
multiplyBuffer.LinkTo(summation);
var writeOut = new ActionBlock<double>((double d) =>
{
Console.WriteLine("Writing out: " + d.ToString());
});
summation.LinkTo(writeOut);
var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
multiplyBuffer.LinkTo(multiply2);
var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
{
Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l =>
_l.ToString()).ToList()));
});
multiply2.LinkTo(writeListOut);
source.Post(new List<int> { 1, 2, 3 });
Task.Run(async () =>
{
await Task.Delay(3000);
Console.WriteLine("posting 2nd...");
source.Post(new List<int> { 4, 5, 6 });
source.Complete();
});
// Never completes
try
{
writeOut.Completion.Wait();
writeListOut.Completion.Wait();
}
catch (AggregateException ex)
{
ex.Handle(e =>
{
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
我注意到如果省略 Completion.wait()
调用,程序就会返回。网络执行时没有观察到任何错误。
示例输出:
写出列表:100、200、300 写出:60 发帖第2... 写出:150 写出 list :400、500、600 (挂起)
预期输出:
写出列表:100、200、300 写出:60 发帖第2... 写出:150 写出 list :400、500、600 (返回)
最佳答案
在 TPL 中,默认情况下源完成不会传递到其他 block 。
您需要构造一个 System.Threading.Tasks.Dataflow.DataflowLinkOptions
并将其 PropagateCompletion
属性设置为 true
,然后将其传递给进入您对 LinkTo
的调用。
或者,您可以按顺序调用所有 block 上的 Complete
方法。
关于c# - 两分支数据流网络不完整,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59217126/