C# react 扩展当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么

标签 c# system.reactive reactive-programming

我是 Rx 的新手,我在想当 IObservable 非常快速地产生大量事件而 OnNext 需要很长时间时会发生什么。我想新事件在内部以某种方式排队,所以我可以运行我们的内存。我对吗?考虑以下小示例:

        Subject<int> subject = new Subject<int>();
        subject.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {      Console.WriteLine("Value published: {0}", x); Thread.Sleep(100); },
                     () => Console.WriteLine("Sequence Completed."));

        for (int i = 0; i < 10; i++)
        {
            subject.OnNext(i);
        }

我发布了 10 个事件,消费者方法非常慢。所有事件都经过处理,因此必须将其缓存在内存中。如果我发布很多事件,我会用完内存,对吗?还是我错过了什么?

有没有办法限制响应式扩展中未决事件的数量?例如,如果有超过 5 个未决事件,我想忽略新事件。

最佳答案

是的,你是对的,缓慢的消费者会导致排队。与您所要求的内容最接近的内置运算符是 Observable.Sample - 但是,这会丢弃较旧的事件以支持较新的事件。这是更常见的要求,因为它可以让你的慢速观察者 catch 来。

让我知道示例是否足够,因为您描述的缓冲行为是一个不寻常的要求 - 它是可以实现的,但实现起来非常复杂并且需要相当重要的代码。

编辑 - 如果您像这样使用 Sample,它将在每次 OnNext 之后返回最新的事件(您需要为此提供一个调度程序 - 并且 NewThreadScheduler 创建一个线程每个订阅,而不是每个事件:

void Main()
{
    var source = Observable.Interval(TimeSpan.FromMilliseconds(10)).Take(100);

    source.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
        .Subscribe(SlowConsumer);

    Console.ReadLine();
}

private void SlowConsumer(long item)
{    
    Console.WriteLine(item + " " + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(TimeSpan.FromSeconds(1));
}

关于C# react 扩展当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20018503/

相关文章:

c# - 我应该在 SQL Server 还是 C# 中使用 "User Defined Functions"?

c# - C# 中的组合生成器

java - 如何使用 RxJava 加载相关对象

c# - 刷新一个控件

c# - 将查询字符串数组参数转换为字典值

c# - 响应式(Reactive)管道——如何控制并行度?

system.reactive - 对于 Buffer 等运算符来说,打开和关闭边界的含义是什么?

javascript - 如何让scan()的使用更加清晰

java - 响应式 JDK 8 中的 if/else

swift - 如何在 Swift Joint 中创建自定义链?