我有一个数据流如下。
1. 以 block 的形式读取文本文件并将它们添加到 BatchBlock<chunkSize>
的任务
2.一个ActionBlock
链接到上面的 BatchBlock
将数据分成批处理并将它们添加到 BufferBlock
3.一个TransformationBlock
链接到 BufferBlock
,它产生一个 async
每批任务
4. 当所有 spanwed async
通话结束。
以下代码未按预期工作。它在处理完所有批处理之前完成。我错过了什么?
private static void DataFlow(string filePath, int chunkSize, int batchSize)
{
int chunkCount = 0;
int batchCount = 0;
BatchBlock<string> chunkBlock = new BatchBlock<string>(chunkSize);
BufferBlock<IEnumerable<string>> batchBlock = new BufferBlock<IEnumerable<string>>();
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
chunkBlock.Post(line);
}
Console.WriteLine("Finished producing");
chunkBlock.Complete();
});
var makeBatches = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk " + ++chunkCount);
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
batchBlock.Post(batch);
}
batchBlock.Complete();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
chunkBlock.LinkTo(makeBatches, new DataflowLinkOptions { PropagateCompletion = true });
var executeBatches = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b =>
{
Console.WriteLine("Got a batch " + ++batchCount);
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
batchBlock.LinkTo(executeBatches, new DataflowLinkOptions { PropagateCompletion = true });
var finishBatches = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing batch" + batchCount);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
executeBatches.LinkTo(finishBatches, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(produceTask);
Console.WriteLine("Production complete");
makeBatches.Completion.Wait();
Console.WriteLine("Making batches complete");
executeBatches.Completion.Wait();
Console.WriteLine("Executing batches complete");
Task.WaitAll(finishBatches.Completion);
Console.WriteLine("Process complete with total chunks " + chunkCount + " and total batches " + batchCount);
Console.ReadLine();
}
// async task to simulate network I/O
private static async Task ExecuteBatch(IEnumerable<string> batch)
{
Console.WriteLine("Executing batch ");
await Task.Run(() => System.Threading.Thread.Sleep(2000));
}
最佳答案
chunkBlock
正在为每个 block 调用 makeBatches
,并且您正在 makeBatches
中调用 batchBlock.Complete()
>,所以在第一批之后它就不再接受新帖子了。
关于c# - 为什么在从 BufferBlock 完全处理所有异步调用之前我的数据流就完成了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32039002/