我有以下场景:
为了给出更清晰的想法,有一个模式代表了前面几点中描述的流程:
请注意,SSid 大于 100 的新项目不会被推送到消费者队列,因为其他队列中还没有相应的项目。
您能否建议一种使用 .NET TPL Dataflow 或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,我想获得有关如何处理此场景的反馈。
提前感谢您的任何建议。
最佳答案
怎么样
像这样:
var syncedProducers =
// ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
ConnectedProducersEvent
.SelectMany(producers =>
Observable
.Merge(producers) // Put all objects, from all producers into the same observable
.GroupBy(@object => @object.SSId) // Group objects by matching SSId
.SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers
// Now you can wire syncedProducers to consumers
var consumer1 =
syncedProducers
.Select(x => x.Where(y => y.Producer == 1));
You can run the example on dotnetfiddle
关于task-parallel-library - 具有分离队列的多生产者多消费者数据同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57359033/