c# - 替代具有保证交付的 Dataflow BroadcastBlock

标签 c# multithreading task-parallel-library async-await tpl-dataflow

我需要某种类似于 BroadcastBlock 的对象,但可以保证传送。所以我使用了 this question 的答案.但我并不是很清楚这里的执行流程。我有一个控制台应用程序。这是我的代码:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

每个数字都必须按顺序处理,所以我不能在 broadcaster block 中使用 MaxDegreeOfParallelism。但是所有接收到数字的 Action block 都可以并行运行。

问题来了:

在输出中我可以看到不同的线程 ID。我理解正确吗,它的工作原理如下:

执行命中广播中的 await block.SendAsync(num);。 如果当前 block 尚未准备好接受该号码,则执行退出广播程序并卡在 Task.WaitAll 处。 当 block 接受数字时,broadcaster 中的 foreach 语句的其余部分在线程池中执行。 直到最后都一样。 foreach 的每次迭代都在线程池中执行。但实际上它是顺序发生的。

我的理解是对还是错? 我如何更改此代码以将数字异步发送到所有 block ?

为了确保如果其中一个区 block 目前还没有准备好接收号码,我不会等待它,所有其他准备就绪的区 block 都会收到号码。并且所有 block 都可以并行运行。并保证交期。

最佳答案

假设您希望 broadcaster 一次处理一个项目,同时使目标 block 能够同时接收该项目,您需要更改 broadcaster 以提供数量同时到所有 block ,然后异步等待所有 block 一起在继续下一个数字之前接受它:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

现在,在等待之后没有工作的情况下,您可以稍微优化一下,同时仍然返回一个等待任务:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);

关于c# - 替代具有保证交付的 Dataflow BroadcastBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25082064/

相关文章:

c# - Process.HasExited 竞争条件

c# - 为什么在 C# .Net Core 3 中连接到 MySql 时出现错误

c# - 处理一批项目,返回一个对象来报告状态

c# - 为什么这个任务会立即退出?

c# - 仅运行当前任务和最近排队的任务

C# 4 和 CLR 兼容性

java - 从 Runnable 重新抛出已检查的异常

java - RxJava2 PublishSubject 订阅者在使用 SingleScheduler 从多个线程调用时无法接收项目

c# - 需要一种干净的方式在 C# 中的两个执行线程之间进行握手

c# - 为什么 block 按此顺序运行?