C# Rx 如何在创建的 Observable 中正确处理源 Enumerable

标签 c# ienumerable system.reactive dispose idisposable

我想将一个 IEnumerable、IDisposable(源)改编成一个 Observable,并且想知道执行此操作的最佳方法并在取消订阅时调用 source.Dispose 方法。

有一个example关于改编 IEnumerable 的 introtorx.com,但它明确指出它有许多缺点,例如不正确的处理模式、糟糕的并发模型、没有错误处理等......并且内置版本可以处理这些问题。但是内置版本似乎不会在取消订阅时在源 IEnumerable 上调用 Dispose。

理想情况下,我想使用 .Publish().RefCount() 模式在同一源上拥有多个订阅者,并且只有源 Dispose()当他们都取消订阅时调用。

这是我尝试的代码,尽管它不起作用。

static void FromEnumerableTest() {
    var observable = Observable.Create<int>(
        observer => {
            var source = new JunkEnumerable();
            foreach (int i in source) {
                observer.OnNext(i);
            }
            return () => {
                source.Dispose();
            };
        })
        .SubscribeOn(Scheduler.Default)
        .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
        .Publish()
        .RefCount();

    //var observable = Observable.ToObservable(new JunkEnumerable())
    //    .SubscribeOn(Scheduler.Default)
    //    .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
    //    .Publish()
    //    .RefCount();

    Console.WriteLine("Press any key to subscribe");
    Console.ReadKey();

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
    Console.WriteLine("Press any key to unsubscribe");
    Console.ReadKey();
    subscription.Dispose();

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}


class JunkEnumerable : IEnumerable<int>, IDisposable {
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); }

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }

    class Enumerator : IEnumerator<int> {
        private int counter = 0;
        public int Current {
            get {
                Thread.Sleep(1000);
                return counter++;
            }
        }

        object IEnumerator.Current { get { return Current; } }

        public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }

        public bool MoveNext() { return true; }

        public void Reset() { counter = 0; }
    }
}

最佳答案

Rx 订阅生命周期分为三个阶段:

  1. 订阅
  2. 观察
  3. 退订

如果订阅从未完成,则不会出现取消订阅代码。毕竟,如果您从未完全订阅过,为什么还需要退订呢?您的示例代码在订阅代码中有一个无限循环,因此它永远不会完成,因此取消订阅代码永远不会发生。

处理 IDisposable 的正常方法是使用 Observable.Using。处理 IEnumerable 的正常方法是使用 .ToObservable。如果您尝试将异步引入同步的可枚举代码(如您的示例),您可以按如下方式进行:

var observable = Observable.Using(() => new JunkEnumerable(), junk => 
    Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);

只要 TimeSpan 大于 15 毫秒,Rx 就会将其转为异步,完成订阅。后续值属于观察阶段的一部分,取消订阅将完全发生。

关于C# Rx 如何在创建的 Observable 中正确处理源 Enumerable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41251667/

相关文章:

f# - 在可移植类库上使用 F# 中的 Rx 现在还有一些问题吗?

system.reactive - 自第一个新组元素以来超时的 Rx 缓冲区

c# - 哪个响应式扩展代码更高效?

c# - IoC 和绑定(bind)到接口(interface)

c# - 如何在避免不必要的副本的同时从 List<T> 获取 Span<T>?

c# - Microsoft 集合指南 : Confused about several parts

c# - 如何使用 yield break 跳出递归 IEnumerable<T> 循环?

c# - 为什么 .ForEach() 在 IList<T> 而不是 IEnumerable<T> 上?

c# - 如何在没有 CopyFile 或 CopyFileEx 的情况下在 Windows 上复制大文件?

c# - 将带参数的函数传递给不期望任何参数的委托(delegate)