task-parallel-library - 具有分离队列的多生产者多消费者数据同步

标签 task-parallel-library reactive-programming system.reactive tpl-dataflow rx.net

我有以下场景:

  • 可变数量(大于三个)的队列(取决于文件中设置的配置)
  • 其中一些队列可以被提供或不提供数据(这取决于通过网络客户端接收数据的生产者:客户端可以在同一 session 期间连接或不连接)
  • 这些队列以不同的速度馈送;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
  • 这些队列中的对象必须根据所有对象共享的属性进行同步(一个名为“SSId”的 int 属性不断增加)
  • 只有在给定时刻接收数据的队列才必须进行同步(必须排除未连接的队列)
  • 当对象同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都与特定消费者相关联
  • 在上一步之后,每个消费者都能够同时处理具有相同“SSId”属性值的入队对象;
  • 因此,最终结果应该是一个系统,其中消费者能够以相同的速率处理数据(根据已经提到的“SSId”属性同步),即使每个生产者以不同的速度/速率生成数据

  • 为了给出更清晰的想法,有一个模式代表了前面几点中描述的流程:
    dataflow mesh

    请注意,SSid 大于 100 的新项目不会被推送到消费者队列,因为其他队列中还没有相应的项目。

    您能否建议一种使用 .NET TPL Dataflow 或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,我想获得有关如何处理此场景的反馈。
    提前感谢您的任何建议。

    最佳答案

    怎么样

  • 将所有生产者的对象合并为一个可观察对象
  • 按 SSId 对对象进行分组
  • 当组大小等于生产者的数量时发出组(通过 .Buffer())

  • 像这样:

    var syncedProducers = 
        // ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
        ConnectedProducersEvent
            .SelectMany(producers => 
                Observable
                    .Merge(producers) // Put all objects, from all producers into the same observable
                    .GroupBy(@object => @object.SSId) // Group objects by matching SSId
                .SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers
    
    // Now you can wire syncedProducers to consumers
    var consumer1 = 
        syncedProducers
            .Select(x => x.Where(y => y.Producer == 1));
    

    You can run the example on dotnetfiddle

    关于task-parallel-library - 具有分离队列的多生产者多消费者数据同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57359033/

    相关文章:

    spring - 为什么 Spring ReactiveMongoRepository 没有 Mono 的保存方法?

    c# - c# 中是否已有条件 Zip 函数?

    system.reactive - RxJs——每次事件爆发后重播所有事件

    c# - Task.Delay().Wait() 是怎么回事?

    c# - 如何仅在使用 TPL 时从自己的方法获取堆栈跟踪?

    java - 使用 Spring WebClient 时没有从端点接收到数据,但我可以使用curl 来获取数据

    c# - 使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值

    c# - 如何使用任务并行库管理任务列表

    c# - 太多任务导致 SQL 数据库超时

    java - 如何在准备就绪时使用响应式(Reactive) Flux/Mono 将消息推送到上游,而不是间隔轮询状态?