我想将一个 IEnumerable、IDisposable(源)改编成一个 Observable,并且想知道执行此操作的最佳方法并在取消订阅时调用 source.Dispose 方法。
有一个example关于改编 IEnumerable 的 introtorx.com,但它明确指出它有许多缺点,例如不正确的处理模式、糟糕的并发模型、没有错误处理等......并且内置版本可以处理这些问题。但是内置版本似乎不会在取消订阅时在源 IEnumerable 上调用 Dispose。
理想情况下,我想使用 .Publish().RefCount()
模式在同一源上拥有多个订阅者,并且只有源 Dispose()
当他们都取消订阅时调用。
这是我尝试的代码,尽管它不起作用。
static void FromEnumerableTest() {
var observable = Observable.Create<int>(
observer => {
var source = new JunkEnumerable();
foreach (int i in source) {
observer.OnNext(i);
}
return () => {
source.Dispose();
};
})
.SubscribeOn(Scheduler.Default)
.Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
.Publish()
.RefCount();
//var observable = Observable.ToObservable(new JunkEnumerable())
// .SubscribeOn(Scheduler.Default)
// .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
// .Publish()
// .RefCount();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
class JunkEnumerable : IEnumerable<int>, IDisposable {
public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }
public IEnumerator<int> GetEnumerator() { return new Enumerator(); }
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
class Enumerator : IEnumerator<int> {
private int counter = 0;
public int Current {
get {
Thread.Sleep(1000);
return counter++;
}
}
object IEnumerator.Current { get { return Current; } }
public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }
public bool MoveNext() { return true; }
public void Reset() { counter = 0; }
}
}
最佳答案
Rx 订阅生命周期分为三个阶段:
- 订阅
- 观察
- 退订
如果订阅从未完成,则不会出现取消订阅代码。毕竟,如果您从未完全订阅过,为什么还需要退订呢?您的示例代码在订阅代码中有一个无限循环,因此它永远不会完成,因此取消订阅代码永远不会发生。
处理 IDisposable
的正常方法是使用 Observable.Using
。处理 IEnumerable
的正常方法是使用 .ToObservable
。如果您尝试将异步引入同步的可枚举代码(如您的示例),您可以按如下方式进行:
var observable = Observable.Using(() => new JunkEnumerable(), junk =>
Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);
只要 TimeSpan 大于 15 毫秒,Rx 就会将其转为异步,完成订阅。后续值属于观察阶段的一部分,取消订阅将完全发生。
关于C# Rx 如何在创建的 Observable 中正确处理源 Enumerable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41251667/