c# - 以固定或最小间隔处理 Rx 事件

标签 c# .net system.reactive

我有一系列事件,每 10-1000 毫秒发生一次。我订阅了这个事件源,但想以 500 毫秒的固定(或最小)间隔处理它们。 我还想一次处理一个事件,而不是分批处理(比如 Buffer(x > 1))。

伪代码是这样的:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

试过例如:

observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);

以及许多其他潜在的解决方案。到目前为止没有运气。

有什么想法吗?

最佳答案

我回答了这个问题on my blog here .

通过添加呈现作为扩展方法进行复制(以防链接失效!):

将 Rx 中的事件流限制为最大速率

有时,您想限制事件从 Rx 流到达的速率。

如果另一个事件在指定的时间间隔内到达,Throttle 运算符将抑制一个事件。这在很多情况下都非常有用,但它确实有两个重要的副作用——即使是未抑制的事件也会延迟间隔,如果事件到达得太快,它们将被完全丢弃。

我遇到过这两种情况都 Not Acceptable 情况。在这种特殊情况下,所需的行为如下:事件应以 TimeSpan 指定的最大速率输出,否则应尽快输出。

一个解决方案是这样的。想象一下我们的输入流是一群到达火车站的人。对于我们的输出,我们希望人们以最大速度离开车站。我们通过让每个人站在平板铁路卡车的前面并以固定速度将卡车送出车站来设置最大速率。因为只有一条轨道,而且所有卡车都以相同的速度和相同的长度行驶,所以当卡车背靠背发车时,人们将以最大速度离开车站。但是,如果轨道畅通,下一个人将能够立即离开。

那么我们如何将这个比喻转化为 Rx?

我们将使用 Concat 运算符的能力来接受流并将它们背靠背合并在一起——就像将铁路卡车送下轨道一样。

为了让每个人都等同于登上火车,我们将使用 Select 将每个事件(人)转换到一个可观察序列(火车),该序列以单个 OnNext 事件(人)开始并以OnComplete 正好是定义的时间间隔。

让我们假设输入事件是变量输入中的 IObservable。这是代码:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

作为扩展方法,这变成了:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
    return source.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

}

关于c# - 以固定或最小间隔处理 Rx 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21588625/

相关文章:

c# - 为什么 IObservables 链完全为每个订阅解析?

c# - 如何优化递归函数的 Reactive 实现

c# - Parallel.For 循环执行速度比 for 慢

c# - Datagridview复选框单元格更改按钮和画线的颜色

c# - 当我的播放器死机时,如何关闭特定音频?

.net - 构建同步框架 DAL 的正确方法

c# - 变量在循环中保留其值?

c# - 使用静态获取仅属性线程安全吗?

c# - 使用 GetHashCode 比较相同的匿名类型是否安全?

c# - 如何将 IObservable<IObservable<T>> 转换为 IObservable<IEnumerable<T>>?