将外部系统生成的以下类型视为连续数据流;
public class Point
{
public decimal Bid;
public decimal Ask;
public string Currency;
}
收到特定数量的元素后,我必须根据该数据进行一些计算。因此,为了进行计算,我需要例如 10 个点,其中 Currency
包含相同的值并充当键。所以一组看起来像这样:
var point0 = new Point { Currency = "EUR", Bid = 12.5M, Ask = 10.5M }
var point1 = new Point { Currency = "EUR", Bid = 11.7M, Ask = 10.8M }
[...]
var point9 = new Point { Currency = "EUR", Bid = 13.5M, Ask = 11.5M }
所以我认为 BatchBlock
是满足此类需求的完美选择。
var batchBlock = new BatchBlock<Point>(10);
// batchBlock.Post(...)
然而,这不允许我将同一货币组的点分组在一起。积分不会按顺序排列。所以我认为每个货币组都需要一个 BatchBlock
。
但是我如何根据货币喂养这个 BatchBlocks?我需要创建自定义数据流 block 吗?
[编辑]
我必须想出一个自定义数据流 block 并使用 DataflowBlock.Encapsulate()
解决它。该解决方案似乎可以对 Point
的传入实例进行批处理/分组。
但是我不确定这是否是执行此操作的意图方式...我还必须考虑错误处理...
private static IPropagatorBlock<Point, Point[]> CreateSorterBlock(int batchSize)
{
var data = new Dictionary<string, List<Point>>();
var outgoing = new BufferBlock<Point[]>();
var sorter = new Action<Point>(point =>
{
// This currencyPair has never been seen
if (!data.ContainsKey(point.CurrencyPair))
{
data.Add(point.CurrencyPair, new List<Point>());
data[point.CurrencyPair].Add(point);
}
// Other points have already been collected, so add
// to the list of points
else
{
data[point.CurrencyPair].Add(point);
}
// batch is full so lets send it out
if (data[point.CurrencyPair].Count == batchSize)
{
outgoing.Post(data[point.CurrencyPair].ToArray());
data.Remove(point.CurrencyPair);
}
});
var incoming = new ActionBlock<Point>(sorter);
return DataflowBlock.Encapsulate(incoming, outgoing);
}
测试:
[Fact]
public void Should_Batch_And_Group_Messages()
{
var rand = new Random();
var feederBlock = new BufferBlock<Point>();
var sorterBlock = CreateSorterBlock(3);
var resultBlock = new ActionBlock<Point[]>(points =>
{
foreach (var point in points)
{
this.output.WriteLine($"Currency: {point.CurrencyPair}"
+ $" Ask: {point.Ask} Bid: {point.Bid}");
}
this.output.WriteLine($"End of resultBlock");
});
feederBlock.LinkTo(sorterBlock);
sorterBlock.LinkTo(resultBlock);
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "EUR/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XAU/USD" });
feederBlock.Post(new Point{
Ask = rand.Next(), Bid = rand.Next(), CurrencyPair = "XPT/USD" });
}
最佳答案
您使用 DataflowBlock.Encapsulate
方法的想法很棒,但要使其完整,您必须处理传入 block 的 Completion
。否则生成的区 block 永远不会完成,剩余的积分也不会被消耗。
我使用 SendAsync
改进了您的代码而不是 Post
,这样生成的 block 将继续正常工作,以防您稍后决定通过配置 BoundedCapacity
来限制 block 的容量。选项。
MaxDegreeOfParallelism
不应配置选项,因为在传入 block 中运行的代码不是线程安全的。无论如何都不需要并行性,因为 block 的工作量非常小,添加线程安全可能会使它慢而不是快。
public static IPropagatorBlock<Point, Point[]> CreateBatchByCurrencyBlock(
int batchSize)
{
var grouped = new Dictionary<string, List<Point>>(
StringComparer.OrdinalIgnoreCase);
var outgoing = new BufferBlock<Point[]>();
var incoming = new ActionBlock<Point>(async point =>
{
List<Point> list;
if (!grouped.TryGetValue(point.Currency, out list))
{
list = new List<Point>();
grouped.Add(point.Currency, list);
}
list.Add(point);
if (list.Count >= batchSize)
{
await outgoing.SendAsync(list.ToArray()).ConfigureAwait(false);
list.Clear();
}
});
incoming.Completion.ContinueWith(async t =>
{
if (t.Status == TaskStatus.RanToCompletion)
{
foreach (var list in grouped.Values)
{
if (list.Count >= 0)
{
await outgoing.SendAsync(list.ToArray())
.ConfigureAwait(false);
list.Clear();
}
}
}
else if (t.IsFaulted)
{
((ITargetBlock<Point[]>)outgoing).Fault(t.Exception.InnerException);
}
outgoing.Complete();
}, default, TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
return DataflowBlock.Encapsulate(incoming, outgoing);
}
关于c# - 使用 BatchBlocks 时如何对数据进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57161088/