我有一种数据流,可以用几种不同的方式处理...所以我想将我收到的每条消息的副本发送到多个目标,以便这些目标可以并行执行...但是,我需要设置BoundedCapacity在我的数据块上,因为数据的流传输速度比我的目标处理它们的速度快,并且有大量数据。没有BoundedCapacity,我将很快耗尽内存。
但是问题是,如果目标无法处理广播广播块(由于BoundedCapacity),它将丢弃消息。
我需要的是一个BroadcastBlock,它不会丢弃消息,但实际上会拒绝其他输入,直到它可以将消息传递到每个目标然后为更多目标做好准备为止。
是否存在类似的内容,或者有人编写了以这种方式运行的自定义块?
最佳答案
使用ActionBlock
和SendAsync()
构建您要的内容非常简单,例如:
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
IEnumerable<ITargetBlock<T>> targets)
{
var targetsList = targets.ToList();
return new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
这是最基本的版本,但是扩展它以支持可变的目标列表,传播完成或克隆功能应该很容易。
关于task-parallel-library - TPL数据流中保证交付的BroadcastBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22127660/