c# - Rx.Net GroupBy 实现很少丢失元素序列

标签 c# .net system.reactive reactive-programming rx.net

我正在使用 RX 2.2.5。我有 2 个加载子订单的 View

           _transportService
            .ObserveSubOrder(parentOrder.OrderId)
            .SubscribeOn(_backgroundScheduler)
            .ObserveOn(_uiScheduler)
            .Where(subOs => subOs != null)                
            .Snoop("BeforeGrpBy")
            .GroupBy(subOs => subOs.OrderId)
            .Subscribe(subOrdUpdates =>
            {
                AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));                        
            })

在 groupBy 之前它获取所有元素序列,在 groupby 之后出现问题,它很少错过元素序列。我不认为它的并发问题,因为它从日志中很明显。自定义 Snoop 扩展方法用于生成这些日志。

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})

Format Time : (Thread) : Msg

正如您所见,在 groupby onNext 之前被调用了两次,但在它错过了一次之后。 这里的 Rx 语法有问题还是已知问题?任何见解都会有所帮助吗?如果需要任何进一步的说明,请发表评论。

更新: 添加工作/所需日志:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})

更新 2: 可能的错误或功能

只有当 fireNewMapEntry 为真时,GroupBy 才会触发 groupedObservable,(GroupBy.cs),这发生在这里

 if (!_map.TryGetValue(key, out writer))
 {
    writer = new Subject<TElement>();
    _map.Add(key, writer);
    fireNewMapEntry = true;
  }

其中 _map 的类型为 Dictionary<TKey, ISubject<TElement>> .这可能是问题所在吗?

最佳答案

只是一些关于你的代码风格的注释(抱歉,这不是我认为@supertopi 已经回答的真正答案)

  1. 感动你SubscribeOnObserveOn调用是您在最终订阅之前做的最后一件事。在您当前的代码中,您正在执行 Where , SnoopGroupBy所有在_uiScheduler占用宝贵的周期。

  2. 避免在订阅中订阅。看来 AddIfNew拿一把 key 和一个IObservable<T> ,因此我假设它正在内部执行一些订阅。而是依靠你所知道的。如果您使用的是 GroupBy,那么您知道在第一次生成组时 key 将是唯一的。所以这现在可以只是一个添加(如果它是您正在检查的 key )。您也可以使用 Take(1)如果你想明确。如果它是值而不是您正在检查的键,那么 GroupBy似乎是多余的。

  3. 尽量让您的变量名称保持一致,这样当另一个开发人员阅读查询时,他们会得到很好的引导,而不是在 subOs 之间跳转, childOschildUpdates , 当 childOrder似乎是一个更好的名字(imo)

  4. 理想情况下不要在您的可观察序列中返回空值。它有什么作用?在极少数情况下它可能有意义,但我经常发现使用 null 而不是 OnCompleted。表示此序列没有值。

例如

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .GroupBy(childOrder => childOrder.OrderId)
        .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(newGroup =>
        {
            Add(newGroup.Item1, newGroup.Item2);                        
        },
          ex=>//obviously we have error handling here ;-)
        );

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(childOrder =>
          {
             AddIfNew(childOrder.OrderId, childOrder);                             
          },
          ex=>//obviously we have error handling here ;-)
        );

甚至更好(没有 snoop 和 null 检查)

var subscription = _transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(
          childOrder => AddIfNew(childOrder.OrderId, childOrder),
          ex=>//obviously we have error handling here ;-)
        );

第一个

关于c# - Rx.Net GroupBy 实现很少丢失元素序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40277280/

相关文章:

c# - 使用 WPF 将 WriteableBitmap 保存到文件

c# - 映射两个字符列表以使它们互惠

c# - 如何保留/存储我的 Azure 凭据?

c# - 并行处理一组 url 并返回一个 IEnumerable

c# - 接收 : Wait for several observables to complete

c# - 使用关联打开文件

c# - 为什么 C# 结构实例方法在结构字段上调用实例方法首先检查 ecx?

c# - WPF 中的回历和公历?

c# - 如何在一个 VS 解决方案中运行 NUnit 测试之前启动控制台应用程序?

android - 编写多个网络调用 RxJava - Android