c# - 是否可以重新启动 IConnectableObservable?

标签 c# system.reactive reactive-programming

我正在尝试弄清楚如何在 IConnectableObservable 完成或出错后“重新启动”它。

下面的代码显示了一个可连接的可观察对象的两个订阅者(A,B)。一旦他们的订阅被处置,一个新的订阅者 (C) 就会连接。我希望它在 C 看来是一个全新的可观察对象,但我只在第一次订阅时收到异常。

static void Main(string[] args)
{
    var o = Observable.Create<string>(observer =>
        {
            observer.OnNext("msg");
            observer.OnError(new Exception("boom"));
            return Disposable.Create(() => {
                Console.WriteLine("Observer has unsubscribed");
            });
        }
    )
    .Publish();

    o.Subscribe(
        x => Console.WriteLine("A: " + x),
        ex => Console.WriteLine("A: " + ex),
        () => Console.WriteLine("A: done"));

    o.Subscribe(
        x => Console.WriteLine("B: " + x),
        ex => Console.WriteLine("B: " + ex),
        () => Console.WriteLine("B: done"));            

    var subscription = o.Connect();

    subscription.Dispose();

    o.Subscribe(
        x => Console.WriteLine("C: " + x),
        ex => Console.WriteLine("C: " + ex),
        () => Console.WriteLine("C: done"));        

    subscription = o.Connect();

}

给出以下结果:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: System.Exception: boom
Observer has unsubscribed

而我想:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: msg
C: System.Exception: boom
Observer has unsubscribed

有什么想法吗?谢谢!

最佳答案

虽然它不会“重启”可观察对象,但将 Publish 替换为 Replay提供您期望的输出。但是,请记住,这将缓冲来自源可观察对象的所有值。最好限制重播值的数量。

关于c# - 是否可以重新启动 IConnectableObservable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47739717/

相关文章:

javascript - 使用 FRP 管理状态

c# - 如何在不使用 Entity Framework 的情况下使用 ASP NET MVC5 显示 SQL Server 中的表?

c# - Windows Phone 锁屏下导航

ios - 如何限制 observable 直到条件成立

c# - 为什么 Subject<T>.Dispose 不处理当前订阅?

swift - 'Bool' 不可转换为 '() throws -> Bool'

azure - 使用 Reactive X 流处理来自事件中心的数据

c# - 非托管到托管结构

c# - selenium webdriver ijavascriptexecuter executescript 参数类型

c# - 制作一个使用 async/await 的 IObservable<T> 以原始顺序返回已完成的任务