我需要某种类似于 BroadcastBlock 的对象,但可以保证传送。所以我使用了 this question 的答案.但我并不是很清楚这里的执行流程。我有一个控制台应用程序。这是我的代码:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);
Task.WaitAll(ToWait.ToArray());
Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
broadcaster.Complete();
}
每个数字都必须按顺序处理,所以我不能在 broadcaster block 中使用 MaxDegreeOfParallelism。但是所有接收到数字的 Action block 都可以并行运行。
问题来了:
在输出中我可以看到不同的线程 ID。我理解正确吗,它的工作原理如下:
执行命中广播中的 await block.SendAsync(num);
。
如果当前 block 尚未准备好接受该号码,则执行退出广播程序并卡在 Task.WaitAll 处。
当 block 接受数字时,broadcaster 中的 foreach 语句的其余部分在线程池中执行。
直到最后都一样。
foreach 的每次迭代都在线程池中执行。但实际上它是顺序发生的。
我的理解是对还是错? 我如何更改此代码以将数字异步发送到所有 block ?
为了确保如果其中一个区 block 目前还没有准备好接收号码,我不会等待它,所有其他准备就绪的区 block 都会收到号码。并且所有 block 都可以并行运行。并保证交期。
最佳答案
假设您希望 broadcaster
一次处理一个项目,同时使目标 block 能够同时接收该项目,您需要更改 broadcaster
以提供数量同时到所有 block ,然后异步等待所有 block 一起在继续下一个数字之前接受它:
var broadcaster = new ActionBlock<int>(async num =>
{
var tasks = new List<Task>();
foreach (var block in blocks)
{
tasks.Add(block.SendAsync(num));
}
await Task.WhenAll(tasks);
}, execopt);
现在,在等待之后没有工作的情况下,您可以稍微优化一下,同时仍然返回一个等待任务:
ActionBlock<int> broadcaster = new ActionBlock<int>(
num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
关于c# - 替代具有保证交付的 Dataflow BroadcastBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25082064/