c# - 没有最终订阅者的 'Intermediate IObservables' 在根 IObservable 的生命周期内保留在内存中

标签 c# .net system.reactive idisposable

例如,考虑一下:

    public IDisposable Subscribe<T>(IObserver<T> observer)
    {
        return eventStream.Where(e => e is T).Cast<T>().Subscribe(observer);
    }

eventStream是长期存在的事件源。短暂的客户端将使用此方法订阅一段时间,然后通过调用 Dispose 取消订阅。在返回IDisposable .

但是,虽然 eventStream仍然存在,应该保留在内存中,有 2 个新的 IObservables通过此方法创建 - Where() 返回的那个eventStream 可能保存在内存中的方法,以及由 Cast<T>() 返回的那个Where() 返回的方法可能保存在内存中方法。

如何清理这些“中间 IObservables”(是否有更好的名称?)?或者它们现在会在 eventStream 的生命周期内存在吗?即使他们不再有订阅并且除了他们的来源之外没有其他人引用他们 IObservable因此永远不会再有订阅?

如果他们通过通知他们的 parent 他们不再有订阅而被清理,他们怎么知道没有其他人引用他们并且可能在以后的某个时候订阅他们?

最佳答案

However, while the eventStream still exists and should be kept in memory, there has been 2 new IObservables created by this method - the one returned by the Where() method that is presumably held in memory by the eventStream, and the one returned by the Cast() method that is presumably held in memory by the one returned by the Where() method.

你有这个落后。让我们来看看正在发生的事情的链条。

IObservable<T> eventStream; //you have this defined and assigned somewhere

public IDisposable Subscribe<T>(IObserver<T> observer)
{
    //let's break this method into multiple lines

    IObservable<T> whereObs = eventStream.Where(e => e is T);
    //whereObs now has a reference to eventStream (and thus will keep it alive), 
    //but eventStream knows nothing of whereObs (thus whereObs will not be kept alive by eventStream)
    IObservable<T> castObs = whereObs.Cast<T>();
    //as with whereObs, castObs has a reference to whereObs,
    //but no one has a reference to castObs
    IDisposable ret = castObs.Subscribe(observer);
    //here is where it gets tricky.
    return ret;
}

ret 引用或不引用的内容取决于各种可观察对象的实现。从我在 Rx 库的 Reflector 中看到的和我自己编写的运算符来看,大多数运算符不会返回引用了运算符可观察对象本身的一次性对象。

例如,Where 的基本实现类似于(直接在编辑器中键入,无错误处理)

IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> filter)
{
    return Observable.Create<T>(obs =>
      {
         return source.Subscribe(v => if (filter(v)) obs.OnNext(v),
                                 obs.OnError, obs.OnCompleted);
      }
}

请注意,返回的一次性对象将通过创建的观察者引用过滤器函数,但不会引用 Where 可观察对象。 Cast 可以使用相同的模式轻松实现。本质上,运营商成为观察者包装工厂。

所有这一切对手头问题的暗示是,中间 IObservables 有资格在方法结束时 进行垃圾回收。传递给 Where 的过滤器函数会在订阅结束时一直存在,但是一旦订阅被处理或完成,只有 eventStream 会保留(假设它仍然存在)。

编辑 对于 supercat 的评论,让我们看看编译器如何重写它或者你将如何在没有闭包的情况下实现它。

class WhereObserver<T> : IObserver<T>
{
    WhereObserver<T>(IObserver<T> base, Func<T, bool> filter)
    {
        _base = base;
        _filter = filter;
    }

    IObserver<T> _base;
    Func<T, bool> _filter;

    void OnNext(T value)
    {
        if (filter(value)) _base.OnNext(value);
    }

    void OnError(Exception ex) { _base.OnError(ex); }
    void OnCompleted() { _base.OnCompleted(); }
}

class WhereObservable<T> : IObservable<T>
{
    WhereObservable<T>(IObservable<T> source, Func<T, bool> filter)
    {
        _source = source;
        _filter = filter;
    }

    IObservable<T> source;
    Func<T, bool> filter;

    IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(new WhereObserver<T>(observer, filter));
    }
}

static IObservable<T> Where(this IObservable<T> source, Func<T, bool> filter)
{
    return new WhereObservable(source, filter);
}

您可以看到观察者不需要对生成它的可观察对象的任何引用,并且可观察对象也不需要跟踪它创建的观察者。我们甚至没有创建任何新的 IDisposable 来从我们的订阅中返回。

实际上,Rx 有一些用于匿名可观察对象/观察者的实际类,它们接受委托(delegate)并将接口(interface)调用转发给这些委托(delegate)。它使用闭包来创建这些委托(delegate)。编译器不需要发出实际实现接口(interface)的类,但翻译的精神保持不变。

关于c# - 没有最终订阅者的 'Intermediate IObservables' 在根 IObservable 的生命周期内保留在内存中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9737711/

相关文章:

c# - 如何将 sql 查询的结果映射到对象?

c# - 我的加速度计代码有什么问题?

c# - 我如何在不丢弃 RX 中的值的情况下减慢 Observable 的速度?

c# - 将 cookie 添加到 Request.Cookies 集合

c# - 在 DB 中自动聚合数据的最佳方法

c# - 是否有一种简洁的内置方法可以通过索引获取列表项而不会引发异常?

c# - IObservable - 如何发送/发布/推送新值到集合

c# - 使用 Reactive Extensions 动态连接序列

c# - 如何使 C# winform 应用程序可移植

c# - 如何使用控件(例如按钮)从表单调用事件?