我有一个从 Kafka 接收 XML 事件流的应用程序。在按顺序传递给某些业务逻辑之前,必须对这些事件进行反序列化/解析和其他转换。 (此逻辑然后在输出端发出其他事件)。
解析/转换代码是无状态的,而域代码是有状态的并且必须按顺序接收事件。这两个步骤通过使用 System.Threading Channel 进行解耦,以便解析步骤获得完整的“线程”/“cpu”(异步任务)。
我的挑战是解析需要大量 CPU 资源,并且它在一个核心上达到 100% CPU,从而成为服务吞吐量的瓶颈。我尝试使用多线程/并行处理,这在一定程度上提高了吞吐量。然而我的方法似乎不优雅,并且可能会产生大量开销。
在解析步骤中,我使用 Task.Run() 为每个“项目”生成一个任务,然后将该任务添加到输出队列中,确保根据输入顺序添加任务。然后,消费者一次从 Channel 中提取一个任务,并等待其完成并获得结果,然后再继续。
这意味着我正在创建并提交大量任务,并且总的来说,我似乎在热路径中使用了大量线程协调操作。
希望这里有人能有一个好的方法来按顺序处理项目,同时尊重输出的顺序。
最佳答案
所以你有一个Channel<Task<T>>
作为传送带,生产者通过 channel.Writer.TryWrite(Task.Run(() => Parse(item)))
添加任务,消费者读取任务并一个接一个地等待它们:
await foreach (Task<T> task in channel.Reader.ReadAllAsync())
{
T result = await task;
// Do something with the result
}
这是一个非常好的设置。缺点是您无法控制并行度。所以在某些时候你可能有太多 Task.Run
并行运行的操作,导致 ThreadPool
饥饿,这可能会对应用程序的其他部分产生负面影响。您可以通过使用更高级的 Task.Factory.StartNew
安排工作来解决此问题。而不是 Task.Run
,并配置scheduler
与 ConcurrentScheduler
的争论共享的属性(property) ConcurrentExclusiveSchedulerPair
实例。
另一种方法是将 channel 替换为 TransformBlock<TInput,TOutput>
来自 TPL 数据流库。该组件结合了输入缓冲区、输出缓冲区和转换 TInput
的处理器。至TOutput
。它开箱即用,具有并行功能和订单保存功能。这是一个例子:
TransformBlock<Item, Result> block = new(item =>
{
return Parse(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // Configurable, the default is 1
EnsureOrdered = true, // This is the default
});
生产者向区 block 提供 block.Post(item)
,消费者使用 ReceiveAllAsync
枚举该 block 的输出缓冲区方法:
await foreach (var result in block.ReceiveAllAsync())
{
// Do something with the result
}
await block.Completion;
await block.Completion;
最后需要,因为 ReceiveAllAsync
方法目前有a bug ,并且不会将可能的异常作为枚举的一部分传播。
我的期望是 TransformBlock
与当前设置相比,该方法应该具有更少的开销,并且消耗更少的内存。 TPL Dataflow library微软宣称适合“粗粒度数据流和管道任务”。这意味着您的 Parse
方法应该是粗略的。如果它是轻量级的,比如解析单个数字,那么并行化的好处很可能会被同步开销所抵消。在这种情况下,解决方案可能是使用 BatchBlock<T>
将工作分块。 .
TPL Dataflow 库并不完全是尖端技术。它早于 ValueTask
s,所以它没有利用它们。它还具有一些怪癖,例如吞咽OperationCanceledException
可能由 transform
抛出代表。扩展起来也非常困难。虽然它应该比您已有的更好,但它不是绝对最佳的解决方案,但它可能足以满足您的需求。
关于c# - 并行处理 'stream' 元素同时保持输出有序的好方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75816806/