c# - 接收 : Pairing window duration with count of events raised inside the window

标签 c# aggregate system.reactive

我想使用 Rx 来计算 2 个事件流的统计信息。

输入流

//    stream1    --A---B----A-B-----A-----B----A--B|
//    stream2    ----X---X-----------X--X---XX---X--X|

中间结果

窗口持续时间,其中窗口在 A 上打开并在 B 上关闭以及在这些窗口内引发的 stream2 事件的计数

//    result     ------1------0-----------2-------1|    <-- count of stream2 events in [A-B] window
//                     4      2           6       3     <-- paired with window [A-B] window duration

最终结果

按 stream2 事件的计数对中间结果进行分组,并返回每组的窗口持续时间统计信息,例如平均、最小和最大窗口持续时间

//    output     -----------------------------------0    1     2|    <-- count of stream2 events in [A-B] window
//                                                  2   3.5    6     <-- average [A-B] window duration for that count of stream2 events.

Rx 查询

public enum EventKind
{
    START,
    STOP,
    OTHER
};

public struct Event1
{
    public EventKind  Kind;
    public DateTime   OccurenceTime;
};

var merge = stream1.Merge(stream2.Select(x => new Event1
                                        {
                                            Kind = EventKind.OTHER,
                                            OccurenceTime = x
                                        }))
           .RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));

var shared = merge.Publish().RefCount();

// Windows open on START and close on STOP
var windows = shared.Window(
            shared.Where(x => x.Kind == EventKind.START),
            opening => shared.Where(x => x.Kind == EventKind.STOP));

// For each window we're interested in the duration of the window along with
// the count of OTHER events that were raised inside the window
//
var pairs = windows.Select(window => new 
        {
            Duration = window
                .Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
                .Buffer(2,1)                         // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
                .Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
                .Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency

            EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
        }
    );

我想简化可观察类型

  • 来自 IObservable<{IObservable<int>, IObservable<TimeSpan>}>
  • IObservable<{int, TimeSpan}>

这应该是可能的,因为每个窗口都有恰好 1 个持续时间和 1 个其他事件计数。

在这一点上,定义按 EventCount 对窗口进行分组的输出查询应该不会太困难。并选择窗口持续时间的统计信息,例如每组的 Min、Max、Avg。

var result = pairs
        .GroupBy(pair => pair.EventCount)
        .Select(g => new 
            {
                EventCount = g.Key,
                Min = g.Min(x => x.Duration),
                Avg = g.Average(x => x.Duration),
                Max = g.Max(x => x.Duration)
            });

RemoveDisorder是我用来对 OccurenceTime 上的合并观察结果进行排序的扩展方法.我需要它,因为我的输入流不是实时事件(如本例中所示),而是通过 Tx 从日志中读取。合并 2 个排序流的输出本身不再排序。

最佳答案

在使用 Rx 一段时间后,您可能遇到的一个常见场景是关于启动和停止事件。要正确处理它,有多种方法,具体取决于您的要求。

如果您的问题只是数据投影,请检查@Brandon 解决方案,关键是以不同的方式组合,例如使用 SelectMany .如果你想保留 Select运营商有必要返回一个 IObservable<T>输入投影。

无论如何,我认为你的作文大体上有问题,我将在下面尝试说明。

使用 Window运算符,就像您所做的那样,如果在开始流中发生多个连续事件,它将创建多个组。在您的代码中可能会出现问题,因为主事件流将在下一个事件发生时处理多次。

这个例子只是为了向您展示许多组的创建:

var subject = new Subject<Event1>();
var shared = subject.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
var values = shared.Where(a => a.Kind == EventKind.OTHER);

values.Window(start, a => stop).Subscribe(inner => 
 { 
    Console.WriteLine("New Group Started");
    inner.Subscribe(next => 
                    { 
                        Console.WriteLine("Next = "+ next.Kind + " | " + next.OccurenceTime.ToLongTimeString());
                    }, () => Console.WriteLine("Group Completed"));
 });

subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now });
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now.AddSeconds(1) });
subject.OnNext(new Event1 { Kind = EventKind.OTHER, OccurenceTime = DateTime.Now.AddSeconds(2) });
subject.OnNext(new Event1 { Kind = EventKind.STOP, OccurenceTime = DateTime.Now.AddSeconds(3) });

结果:

New Group Started
New Group Started
Next = OTHER | 4:55:46 PM
Next = OTHER | 4:55:46 PM
Group Completed
Group Completed

也许需要这种行为,否则就需要其他组合。为了“驯服”事件流,我看到了三种不同的方法:

  1. 仅从第一个开始事件开始计算,并忽略其他开始而不接收停止。 (例如:Create observable and consume only between events)。
  2. 使用最新的开始事件计算流,在这种情况下,先前的流将被忽略及其内部组合(可能使用 Switch 运算符)。
  3. 独立计算,考虑到每个开始事件都需要一个结束事件,允许在组合中创建多组流(对我来说根本没有意义,除非你有一个标识符来匹配开始和结束事件) .

要实现这些选项中的一个,一般来说,您可以使用多种不同的方法来实现。如果我理解你的问题,你正在寻找选项ONE。现在答案:

  1. 保持Window , 太多代码:
IObservable<Event1> sx= GetEventStream();
var shared = sx.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
shared.Window(start, a => stop)
    .Select(sx => 
            sx.Publish(b =>
                        b.Take(1)
                        .Select(c => 
                        {
                            var final = b.LastOrDefaultAsync().Select(a => a.OccurenceTime);
                            var comp = b.Where(d => d.Kind == EventKind.OTHER).Count();
                            return final.Zip(comp, (d,e) => new { Count = e, Time = d - c.OccurenceTime });
                        })
                        .Switch()   // whatever operator here there's no difference
                    )               // because is just 1
            )
    .Concat()
    .Subscribe(next => 
    { 
        Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
    });
  1. 使用 GroupByUntil ,一种“hack”,但这是我的偏好:

    IObservable<Event1> sx = GetEventStream();
    var shared = sx.Publish().RefCount();       
    var stop = shared.Where(a => a.Kind == EventKind.STOP).Publish().RefCount();
    var start = shared.Where(a => a.Kind == EventKind.START);       
    start.GroupByUntil(a => Unit.Default, a => stop)
            .Select(newGroup => 
            { 
                var creation = newGroup.Take(1);
                var rightStream = shared.Where(a => a.Kind == EventKind.OTHER)
                                        .TakeUntil(newGroup.LastOrDefaultAsync())
                                        .Count();
                var finalStream = stop.Take(1);
    
                return creation.Zip(rightStream, finalStream, (a,b,c) => new { Count = b, Time = c.OccurenceTime - a.OccurenceTime });
            })
            .Concat()
            .Subscribe(next => 
            { 
                Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
            });
    
  2. 不使用 Group/WindowTake(1)在作文的最后添加 Repeat运算符,但可能会导致意外行为,因为“重新订阅”(将取决于它是冷还是热可观察对象,以及使用的调度程序)。

  3. 创建一个声明您自己的扩展方法的自定义实现,并不像看起来那么难,可能是最好的选择,但需要一段时间才能实现。

您的作文的另一个问题是无法获得统计数据,因为您没有办法完成 GroupBy 中的每个新组。运营商。

我建议重新考虑您的方法,解决方案可能会以某种方式合并时间。有关统计信息和 Rx 的更多信息,请查看: http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App

关于c# - 接收 : Pairing window duration with count of events raised inside the window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28331039/

相关文章:

c# - RX - 重新抛出包含方法的错误

c# - 在这种情况下可以覆盖 ToString() 方法吗

c# - MVVM 调度程序 UI 无响应

java - 降低数据集的粒度

c# - ISubject 在哪里?

f# - 通过 Rx 从 MailboxProcessor 返回结果是个好主意吗?

c# - ElementHost 大小导致 wpf 打开/加载速度慢且内存使用率高

c# - 寻找图像直方图中的峰值

c# - 聚合 IEnumerable 中的一系列连续 block

c# - NHibernate 聚合遍历 (C#)