c# - 使用 LinkTo 谓词的 TPL 数据流 block

标签 c# unit-testing task-parallel-library

我有一些 block 最终从 TransformBlock 转到基于 LinkTo 谓词的其他三个转换 block 之一。我正在使用 DataflowLinkOptions 传播完成。问题是,当满足谓词并且启动该 block 时,我的管道的其余部分将继续。管道似乎应该先等待这个 block 完成。

代码是这样的:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

现在,这并不像我所说的那样工作,所以我发现获得我想要的行为的唯一方法是取出 linkOptions 并将以下内容添加到 mainBlock 的 lambda 中。

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});

那么问题来了,这是让它发挥作用的唯一方法吗?

顺便说一句,这已在我的单元测试中运行,其中运行了一个数据项以尝试调试管道行为。每个 block 都经过了多个单元测试的单独测试。所以在我的管道单元测试中发生的是断言在 block 完成执行之前命中,因此失败。

如果我删除 block2 和 block3 链接并使用 linkOptions 调试测试它工作正常。

最佳答案

您的问题不在于问题中的代码,它可以正常工作:当主 block 完成时,所有三个后续 block 也都被标记为完成。

问题出在结束 block :您也在那里使用 PropagateCompletion,这意味着当前三个 block 中的任何完成时,结束 block 被标记完成。你想要的是在所有三个 block 都完成时将其标记为完成,并且你的答案中的 Task.WhenAll().ContinueWith() 组合做到了这一点(尽管该片段的第一部分是不必要的,那做与 PropagateCompletion 完全相同的事情)。

As it turns out, the link option propagation (at least this is my guess) will propagate the completion for blocks that don't satisfy the predicate in the linkTo.

是的,它总是传播完成。 Completion 没有任何与之关联的项目,因此将谓词应用于它没有任何意义。也许您总是只有一个项目(这并不常见)这一事实让您更加困惑?

If my guess is correct I sort of feel like this is bug or design error in the link option completion propagation. Why should a block be complete if it was never used?

为什么不呢?对我来说,这是完全合理的:即使这次没有带有 Status.Delayed 的项目,您仍然希望完成处理这些项目的 block ,以便任何后续代码可以知道所有延迟的项目都已处理。没有任何东西的事实并不重要。


无论如何,如果您经常遇到这种情况,您可能需要创建一个辅助方法,将多个源 block 同时链接到一个目标 block 并正确传播完成:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

用法:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);

关于c# - 使用 LinkTo 谓词的 TPL 数据流 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25706708/

相关文章:

c# - 大量定时器

unit-testing - 如何让 PEX 自动生成单元测试?

java - 无法在 Spock 中 stub Kafka 生产者记录

c# - 从异步 ApiController 返回即时响应

c# - 创建代表一系列连续任务的任务

c# - 从 WPF 应用程序调用 C++ 代码时如何定义变量?

c# - 无法将类型 'System.Data.Entity.DynamicProxies.ApplicationUser_xxxxxxxx' 的对象强制转换为类型 'System.String'

c# - 如何在 WPF 中自定义组合框按钮的外观

c++ - Google 测试装置中的变量

c# - Task.WaitAll,如何找到导致AggregateException的任务