c# - 并行处理 'stream' 元素同时保持输出有序的好方法

标签 c# .net parallel-processing task-parallel-library stream-processing

我有一个从 Kafka 接收 XML 事件流的应用程序。在按顺序传递给某些业务逻辑之前,必须对这些事件进行反序列化/解析和其他转换。 (此逻辑然后在输出端发出其他事件)。

解析/转换代码是无状态的,而域代码是有状态的并且必须按顺序接收事件。这两个步骤通过使用 System.Threading Channel 进行解耦,以便解析步骤获得完整的“线程”/“cpu”(异步任务)。

我的挑战是解析需要大量 CPU 资源,并且它在一个核心上达到 100% CPU,从而成为服务吞吐量的瓶颈。我尝试使用多线程/并行处理,这在一定程度上提高了吞吐量。然而我的方法似乎不优雅,并且可能会产生大量开销。

在解析步骤中,我使用 Task.Run() 为每个“项目”生成一个任务,然后将该任务添加到输出队列中,确保根据输入顺序添加任务。然后,消费者一次从 Channel 中提取一个任务,并等待其完成并获得结果,然后再继续。

这意味着我正在创建并提交大量任务,并且总的来说,我似乎在热路径中使用了大量线程协调操作。

希望这里有人能有一个好的方法来按顺序处理项目,同时尊重输出的顺序。

最佳答案

所以你有一个Channel<Task<T>>作为传送带,生产者通过 channel.Writer.TryWrite(Task.Run(() => Parse(item))) 添加任务,消费者读取任务并一个接一个地等待它们:

await foreach (Task<T> task in channel.Reader.ReadAllAsync())
{
    T result = await task;
    // Do something with the result
}

这是一个非常好的设置。缺点是您无法控制并行度。所以在某些时候你可能有太多 Task.Run并行运行的操作,导致 ThreadPool 饥饿,这可能会对应用程序的其他部分产生负面影响。您可以通过使用更高级的 Task.Factory.StartNew 安排工作来解决此问题。而不是 Task.Run ,并配置scheduler ConcurrentScheduler 的争论共享的属性(property) ConcurrentExclusiveSchedulerPair 实例。

另一种方法是将 channel 替换为 TransformBlock<TInput,TOutput> 来自 TPL 数据流库。该组件结合了输入缓冲区、输出缓冲区和转换 TInput 的处理器。至TOutput 。它开箱即用,具有并行功能和订单保存功能。这是一个例子:

TransformBlock<Item, Result> block = new(item =>
{
    return Parse(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2, // Configurable, the default is 1
    EnsureOrdered = true, // This is the default
});

生产者向区 block 提供 block.Post(item) ,消费者使用 ReceiveAllAsync 枚举该 block 的输出缓冲区方法:

await foreach (var result in block.ReceiveAllAsync())
{
    // Do something with the result
}
await block.Completion;

await block.Completion;最后需要,因为 ReceiveAllAsync方法目前有a bug ,并且不会将可能的异常作为枚举的一部分传播。

我的期望是 TransformBlock与当前设置相比,该方法应该具有更少的开销,并且消耗更少的内存。 TPL Dataflow library微软宣称适合“粗粒度数据流和管道任务”。这意味着您的 Parse方法应该是粗略的。如果它是轻量级的,比如解析单个数字,那么并行化的好处很可能会被同步开销所抵消。在这种情况下,解决方案可能是使用 BatchBlock<T> 将工作分块。 .

TPL Dataflow 库并不完全是尖端技术。它早于 ValueTask s,所以它没有利用它们。它还具有一些怪癖,例如吞咽OperationCanceledException可能由 transform 抛出代表。扩展起来也非常困难。虽然它应该比您已有的更好,但它不是绝对最佳的解决方案,但它可能足以满足您的需求。

关于c# - 并行处理 'stream' 元素同时保持输出有序的好方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75816806/

相关文章:

c# - 来自 ModelState.AddModelError() 的错误消息没有出现

c# - 为 RESTful 服务格式化 DateTime 的最佳方式?

c# - Socket.Send,服务器收到很多无效的 MPacket

algorithm - 哪些类型/类别的算法可以在 MapReduce 范例中重铸?

java - Java 中并行流的实用用例有哪些?

c# - 在 .NET 中获取当前用户的电子邮件地址

c# - TransactionScope Complete() 在退出 USING 语句之前不提交事务

.net - PrintDocument_PrintPage 和 Graphics.DrawImage 大小(以英寸为单位)

c# - 如何确定给定日期是否是该月的第 N 个工作日?

java - 并行化快速排序使其变慢