c# - 使用 BatchBlocks 时如何对数据进行分组?

标签 c# tpl-dataflow

将外部系统生成的以下类型视为连续数据流;

public class Point 
{
    public decimal Bid;
    public decimal Ask; 
    public string Currency; 
}

收到特定数量的元素后,我必须根据该数据进行一些计算。因此,为了进行计算,我需要例如 10 个点,其中 Currency 包含相同的值并充当键。所以一组看起来像这样:

var point0 = new Point { Currency = "EUR", Bid = 12.5M, Ask = 10.5M }
var point1 = new Point { Currency = "EUR", Bid = 11.7M, Ask = 10.8M }
[...]
var point9 = new Point { Currency = "EUR", Bid = 13.5M, Ask = 11.5M }

所以我认为 BatchBlock 是满足此类需求的完美选择。

var batchBlock = new BatchBlock<Point>(10); 
// batchBlock.Post(...) 

然而,这不允许我将同一货币组的点分组在一起。积分不会按顺序排列。所以我认为每个货币组都需要一个 BatchBlock

但是我如何根据货币喂养这个 BatchBlocks?我需要创建自定义数据流 block 吗?


[编辑]

我必须想出一个自定义数据流 block 并使用 DataflowBlock.Encapsulate() 解决它。该解决方案似乎可以对 Point 的传入实例进行批处理/分组。

但是我不确定这是否是执行此操作的意图方式...我还必须考虑错误处理...

private static IPropagatorBlock<Point, Point[]> CreateSorterBlock(int batchSize)
{
    var data = new Dictionary<string, List<Point>>();
    var outgoing = new BufferBlock<Point[]>();
    var sorter = new Action<Point>(point =>
    {
        // This currencyPair has never been seen
        if (!data.ContainsKey(point.CurrencyPair))
        {
            data.Add(point.CurrencyPair, new List<Point>());
            data[point.CurrencyPair].Add(point);
        }
        // Other points have already been collected, so add 
        // to the list of points 
        else
        {
            data[point.CurrencyPair].Add(point);
        }

        // batch is full so lets send it out
        if (data[point.CurrencyPair].Count == batchSize)
        {
            outgoing.Post(data[point.CurrencyPair].ToArray());
            data.Remove(point.CurrencyPair);
        }
    });

    var incoming = new ActionBlock<Point>(sorter);

    return DataflowBlock.Encapsulate(incoming, outgoing);
}

测试:

[Fact]
public void Should_Batch_And_Group_Messages()
{
    var rand = new Random();

    var feederBlock = new BufferBlock<Point>();
    var sorterBlock = CreateSorterBlock(3);
    var resultBlock = new ActionBlock<Point[]>(points =>
    {
        foreach (var point in points)
        {
            this.output.WriteLine($"Currency: {point.CurrencyPair}"
                + $" Ask: {point.Ask} Bid: {point.Bid}");
        }
        this.output.WriteLine($"End of resultBlock");
    });

    feederBlock.LinkTo(sorterBlock);
    sorterBlock.LinkTo(resultBlock);

    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
    feederBlock.Post(new Point{
        Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XPT/USD" });
}

最佳答案

您使用 DataflowBlock.Encapsulate 方法的想法很棒,但要使其完整,您必须处理传入 block 的 Completion。否则生成的区 block 永远不会完成,剩余的积分也不会被消耗。

我使用 SendAsync 改进了您的代码而不是 Post,这样生成的 block 将继续正常工作,以防您稍后决定通过配置 BoundedCapacity 来限制 block 的容量。选项。

MaxDegreeOfParallelism不应配置选项,因为在传入 block 中运行的代码不是线程安全的。无论如何都不需要并行性,因为 block 的工作量非常小,添加线程安全可能会使它慢而不是快。

public static IPropagatorBlock<Point, Point[]> CreateBatchByCurrencyBlock(
    int batchSize)
{
    var grouped = new Dictionary<string, List<Point>>(
        StringComparer.OrdinalIgnoreCase);
    var outgoing = new BufferBlock<Point[]>();
    var incoming = new ActionBlock<Point>(async point =>
    {
        List<Point> list;
        if (!grouped.TryGetValue(point.Currency, out list))
        {
            list = new List<Point>();
            grouped.Add(point.Currency, list);
        }
        list.Add(point);
        if (list.Count >= batchSize)
        {
            await outgoing.SendAsync(list.ToArray()).ConfigureAwait(false);
            list.Clear();
        }
    });

    incoming.Completion.ContinueWith(async t =>
    {
        if (t.Status == TaskStatus.RanToCompletion)
        {
            foreach (var list in grouped.Values)
            {
                if (list.Count >= 0)
                {
                    await outgoing.SendAsync(list.ToArray())
                        .ConfigureAwait(false);
                    list.Clear();
                }
            }
        }
        else if (t.IsFaulted)
        {
            ((ITargetBlock<Point[]>)outgoing).Fault(t.Exception.InnerException);
        }
        outgoing.Complete();
    }, default, TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);

    return DataflowBlock.Encapsulate(incoming, outgoing);
}

关于c# - 使用 BatchBlocks 时如何对数据进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57161088/

相关文章:

c# - ITargetBlock<TInput>.Completion.ContinueWith() 的最佳实践

c# - 将基于 C# BlockingCollection 的代码转换为 TPL 数据流

c# - 如何以(线程)安全的方式跟踪 TPL 管道中的故障项

c# - 服务器控件、客户端控件、面板、默认按钮和客户端事件

c# - 获取深度嵌入的 XML 元素值

c# - 如何从存储过程中获取返回值?

c# - 将 recaptcha(具有自定义外观)与 asp.net 集成

c# - TPL 数据流 block 下游如何获取源生成的数据?

c# - 最佳 TPL 数据流设计?

c# - 在 if 语句中实例化一个 int