我有一个由几个块组成的数据流管道。
当元素流经我的处理管道时,我想按字段 A
将它们分组。为此,我有一个 BatchBlock
高 BoundedCapacity
。在其中我存储我的元素,直到我决定它们应该被释放。所以我调用了 TriggerBatch()
方法。
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
这是它的样子。
问题是,生产的批次有时包含下一个发布的元素,它不应该在那里。
为了显示:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
在这一点上,我希望我的批次是
{A,A,A}
,但它是 {A,A,A,B}
就像
TriggerBatch()
是异步的,而 SendAsync
实际上是在实际制作批处理之前执行的。我该如何解决这个问题?
我显然不想把
Task.Wait(x)
放在那里(我试过,它有效,但性能很差,当然)。
最佳答案
我还尝试在错误的位置调用 TriggerBatch
时遇到了这个问题。如前所述,使用 DataflowBlock.Encapsulate
的 SlidingWindow 示例是这里的答案,但需要一些时间来适应,所以我想我会分享我完成的块。
我的 ConditionalBatchBlock
创建批次达到最大尺寸,如果满足特定条件,可能会更快。在我的特定场景中,我需要创建 100 个批次,但在检测到数据发生某些变化时总是创建新批次。
public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
var queue = new Queue<T>();
var source = new BufferBlock<T[]>();
var target = new ActionBlock<T>(async item =>
{
// start a new batch if required by the condition
if (condition(queue, item))
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
queue.Enqueue(item);
// always send a batch when the max size has been reached
if (queue.Count == batchSize)
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
});
// send any remaining items
target.Completion.ContinueWith(async t =>
{
if (queue.Any())
await source.SendAsync(queue.ToArray());
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
在您的情况下,
condition
参数可能更简单。我需要查看队列以及当前项目来决定是否创建新批次。我是这样用的:
public async Task RunExampleAsync<T>()
{
var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));
var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));
conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await ReadDataAsync<T>(conditionalBatchBlock);
await actionBlock.Completion;
}
关于c# - BatchBlock 使用在 TriggerBatch() 之后发送的元素生成批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35894878/