c# - 为什么在从 BufferBlock 完全处理所有异步调用之前我的数据流就完成了?

标签 c# task-parallel-library tpl-dataflow

我有一个数据流如下。

1. 以 block 的形式读取文本文件并将它们添加到 BatchBlock<chunkSize> 的任务

2.一个ActionBlock链接到上面的 BatchBlock将数据分成批处理并将它们添加到 BufferBlock

3.一个TransformationBlock链接到 BufferBlock ,它产生一个 async每批任务

4. 当所有 spanwed async通话结束。

以下代码未按预期工作。它在处理完所有批处理之前完成。我错过了什么?

private static void DataFlow(string filePath, int chunkSize, int batchSize)
{
    int chunkCount = 0;
    int batchCount = 0;

    BatchBlock<string> chunkBlock = new BatchBlock<string>(chunkSize);
    BufferBlock<IEnumerable<string>> batchBlock = new BufferBlock<IEnumerable<string>>();

    Task produceTask = Task.Factory.StartNew(() =>
    {
        foreach (var line in File.ReadLines(filePath))
        {
            chunkBlock.Post(line);
        }

        Console.WriteLine("Finished producing");
        chunkBlock.Complete();
    });

    var makeBatches = new ActionBlock<string[]>(t =>
    {
        Console.WriteLine("Got a chunk  " + ++chunkCount);

        // Partition each chunk into smaller chunks grouped on column 1
        var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);

        // Further beakdown the chunks into batch size groups
        var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

        // Get batches from groups
        var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

        foreach (var batch in batches)
        {
            batchBlock.Post(batch);
        }

        batchBlock.Complete();

    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    chunkBlock.LinkTo(makeBatches, new DataflowLinkOptions { PropagateCompletion = true });

    var executeBatches = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b =>
    {
        Console.WriteLine("Got a batch  " + ++batchCount);
        await ExecuteBatch(b);
        return b;

    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    batchBlock.LinkTo(executeBatches, new DataflowLinkOptions { PropagateCompletion = true });

    var finishBatches = new ActionBlock<IEnumerable<string>>(b =>
    {
        Console.WriteLine("Finised executing  batch" + batchCount);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    executeBatches.LinkTo(finishBatches, new DataflowLinkOptions { PropagateCompletion = true });

    Task.WaitAll(produceTask);
    Console.WriteLine("Production complete");

    makeBatches.Completion.Wait();
    Console.WriteLine("Making batches complete");

    executeBatches.Completion.Wait();
    Console.WriteLine("Executing batches complete");

    Task.WaitAll(finishBatches.Completion);

    Console.WriteLine("Process complete with total chunks " + chunkCount + " and total batches " + batchCount);
    Console.ReadLine();
}

// async task to simulate network I/O
private static async Task ExecuteBatch(IEnumerable<string> batch)
{
    Console.WriteLine("Executing batch ");
    await Task.Run(() => System.Threading.Thread.Sleep(2000));
}

最佳答案

chunkBlock 正在为每个 block 调用 makeBatches,并且您正在 makeBatches 中调用 batchBlock.Complete() >,所以在第一批之后它就不再接受新帖子了。

关于c# - 为什么在从 BufferBlock 完全处理所有异步调用之前我的数据流就完成了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32039002/

相关文章:

c# - Go 例程与任务并行库的实现

c# - 如何将同步上下文/任务调度程序替换为 TaskCompletionSource.Task 中的另一个同步上下文/任务调度程序以用于ConfigureAwait(false)?

c# - 为什么 CancellationToken 与 CancellationTokenSource 是分开的?

c# - TPL 数据流 : Flatten incoming collection to sequential items

c# - 使用 Azure 进行桌面应用程序日志

c# - 处理 Excel Application.ScreenUpdating 中的异常 (0x800AC472)

c# - 如何强制文本为特定宽度?

具有无限循环的异步任务之间的 C# 同步?

c# - 一起使用 BlockingCollection<T> 和 TPL 数据流时出现死锁

c# - TPL 数据流,Post() 和 SendAsync() 之间的功能区别是什么?