c# - TPL 数据流中 BroadcastBlock 的重复异常

标签 c# exception tpl-dataflow

我正在尝试使用 TPL 数据流来创建管道。到目前为止一切正常,我的管道定义如下(尽管我的问题只是广播公司、submissionSucceeded、submissionFailed):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

我遇到的问题是异常的传播。因为我的 BroadcastBlock 将其完成(以及任何故障)传播到两个 block ,所以如果确实发生异常,它会传播到两个 block 。因此当我这样做时

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

我最终得到一个包含两个异常的聚合异常。现在我能做的最好的就是过滤这些,即:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

但我想知道是否有更好的方法来做到这一点。即如果只发生一个异常,我只想引发一个异常。我是 Dataflow 的新手,所以只是想了解所有约定。

最佳答案

我编写了一个 TPL DataFlow 示例 ( https://github.com/squideyes/PodFetch ),它采用稍微不同的方法来完成和错误处理。以下是 Program.cs 的第 171 行到第 201 行的相关代码:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

如您所见,我将几个 TPL block 链接在一起,然后使用 HandleCompletion 扩展方法准备好处理完成:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

非常重要的是,当我完成将对象传递到链中的第一个 block 时,我会调用 scraper.Complete()。这样,HandleCompletion 扩展方法就可以处理延续。而且,由于我正在等待 fetcher(链中要完成的最后一个 block ),因此很容易在 try/catch 中捕获任何由此产生的错误。

关于c# - TPL 数据流中 BroadcastBlock 的重复异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21534037/

相关文章:

c# - 加载 WPF 组合框的数据集

c# - ViewModel 中 UIElements 的 UserControl 会破坏 MVVM?

c# - System.Net.Mail无法加载或初始化请求的服务提供者

c# - 强制任务在当前线程上继续?

c# - 'MSDAORA.1' 提供者未在本地机器上注册

java - 当用户按 'q' 然后按 'ENTER' 时程序运行时退出

java - NAO Robotics学生在AMD 64位平台上编译错误: Can't load IA 32-bit . dll

c# - 使用 TPL 数据流在预定义 block 之上创建可重用的处理逻辑?

c# - 下游 block 中的 TPL 数据流和异常处理

c# - 'base' 和 'this' 关键字之间的区别