task-parallel-library - TPL数据流中保证交付的BroadcastBlock

标签 task-parallel-library tpl-dataflow

我有一种数据流,可以用几种不同的方式处理...所以我想将我收到的每条消息的副本发送到多个目标,以便这些目标可以并行执行...但是,我需要设置BoundedCapacity在我的数据块上,因为数据的流传输速度比我的目标处理它们的速度快,并且有大量数据。没有BoundedCapacity,我将很快耗尽内存。

但是问题是,如果目标无法处理广播广播块(由于BoundedCapacity),它将丢弃消息。

我需要的是一个BroadcastBlock,它不会丢弃消息,但实际上会拒绝其他输入,直到它可以将消息传递到每个目标然后为更多目标做好准备为止。

是否存在类似的内容,或者有人编写了以这种方式运行的自定义块?

最佳答案

使用ActionBlockSendAsync()构建您要的内容非常简单,例如:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets)
{
    var targetsList = targets.ToList();

    return new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}


这是最基本的版本,但是扩展它以支持可变的目标列表,传播完成或克隆功能应该很容易。

关于task-parallel-library - TPL数据流中保证交付的BroadcastBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22127660/

相关文章:

c# - Task Parallel Library中的BlockingCollection不会自动释放底层实例的引用

c# - TPL 数据流 block 在 UI 线程上运行

c# - 是否有不接受输入但返回输出的 TPL 数据流 block ?

.net - 我怎样才能让一个 IPropagatorBlock<TInput, TOutput> 自动停止?

c# - ActionBlock<T> 与 Task.WhenAll

.net - 是否有像 TransformBlock<TIn, TOut> 这样的数据流 block 允许不按顺序传播项目?

c# - 为什么 Parallel.For 会执行 WinForms 消息泵,如何防止?

c# - 异步 CTP - 环境 CancellationToken 和 IProgress

c# - FIle.ReadAll***Async/WriteAll***Async/AppendAll***Async 方法在哪里?

c# - .NET 任务中的代码最终是否通过 native Windows 线程在 CPU 或 CPU 核心上运行?