c# - BatchBlock 使用在 TriggerBatch() 之后发送的元素生成批处理

标签 c# task-parallel-library tpl-dataflow

我有一个由几个块组成的数据流管道。
当元素流经我的处理管道时,我想按字段 A 将它们分组。为此,我有一个 BatchBlockBoundedCapacity 。在其中我存储我的元素,直到我决定它们应该被释放。所以我调用了 TriggerBatch() 方法。

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}

这是它的样子。
问题是,生产的批次有时包含下一个发布的元素,它不应该在那里。

为了显示:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);

在这一点上,我希望我的批次是 {A,A,A} ,但它是 {A,A,A,B}
就像 TriggerBatch() 是异步的,而 SendAsync 实际上是在实际制作批处理之前执行的。

我该如何解决这个问题?
我显然不想把 Task.Wait(x) 放在那里(我试过,它有效,但性能很差,当然)。

最佳答案

我还尝试在错误的位置调用 TriggerBatch 时遇到了这个问题。如前所述,使用 DataflowBlock.Encapsulate 的 SlidingWindow 示例是这里的答案,但需要一些时间来适应,所以我想我会分享我完成的块。

我的 ConditionalBatchBlock 创建批次达到最大尺寸,如果满足特定条件,可能会更快。在我的特定场景中,我需要创建 100 个批次,但在检测到数据发生某些变化时总是创建新批次。

public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}

在您的情况下, condition 参数可能更简单。我需要查看队列以及当前项目来决定是否创建新批次。

我是这样用的:
public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}

关于c# - BatchBlock 使用在 TriggerBatch() 之后发送的元素生成批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35894878/

相关文章:

c# - 如何使用正则表达式在C#中查找和替换“@@ word @@”?

c# - 我应该在 Debugger.Break 或 Debug.WriteLine 之前检查 Debugger.IsAttached 吗?

c# - UnsafeQueueUserWorkItem 和 "does not propagate the calling stack"到底是什么意思?

c# - 将基于 C# BlockingCollection 的代码转换为 TPL 数据流

c# - 如何设置测试项目(命名空间、有用的辅助对象等)?

c# - 如何在 C# 中发起 HTTP 请求

c# - 非异步异步方法的命名约定是什么?

c# - Task.StartNew() 与 Parallel.ForEach : Multiple Web Requests Scenario

c# - BatchBlock的对面 block 是什么

c# - 下游 block 中的 TPL 数据流和异常处理