c# - 为什么在所有初始订阅者断开连接后 RefCount 不工作?

标签 c# .net system.reactive

考虑以下几点:

[Fact]
public void foo()
{
    var result = new Subject<bool>();
    var startCount = 0;
    var completionCount = 0;
    var obs = Observable
        .Defer(() =>
            {
                ++startCount;
                return result.FirstAsync();
            })
        .Do(_ => ++completionCount)
        .Publish()
        .RefCount();

    // pretend there are lots of subscribers at once
    var s1 = obs.Subscribe();
    var s2 = obs.Subscribe();
    var s3 = obs.Subscribe();

    // even so, we only expect to be started once
    Assert.Equal(1, startCount);
    Assert.Equal(0, completionCount);

    // and we won't complete until the result ticks through
    result.OnNext(true);
    Assert.Equal(1, startCount);
    Assert.Equal(1, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();

    // now try exactly the same thing again
    s1 = obs.Subscribe();
    s2 = obs.Subscribe();
    s3 = obs.Subscribe();

    // startCount is 4 here instead of the expected 2!
    Assert.Equal(2, startCount);
    Assert.Equal(1, completionCount);

    result.OnNext(true);
    Assert.Equal(2, startCount);
    Assert.Equal(2, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();
}

我对 Publish + RefCount 的理解是,只要至少有一个订阅者,就会保持与源的连接。一旦最后一个订阅者断开连接,任何 future 的订阅者都将重新启动与源的连接。

正如您在我的测试中看到的那样,第一次通过时一切都完美无缺。但是第二次,管道内的延迟可观察对象会为每个新订阅者执行一次。

我可以通过调试器看到,对于第一组订阅者,每次调用 Subscribe 时,obs._count(对订阅者进行计数)都会增加。但对于第二组订阅者,它仍然为零。

为什么会发生这种情况,我可以做些什么来纠正我的管道?

最佳答案

@user631090 的答案很接近,但不正确,所以我想我会自己回答。

这是因为 Publish 如果它发布的流本身已完成,它将立即完成新的订阅者。您可以在图中看到 here :

enter image description here

但是,如果图表在底层流完成后 包含一个订阅者,那就更好了。

雪上加霜的是,Defer 仍然需要为新订阅者调用。但由于初始流已完成,它的返回值被 Publish 忽略。

我还无法想出一种方法来实现我的预期用例。我想也许使用 Multicast 而不是 Publish,根据需要创建一个新主题。但我还没有能够做到这一点。对于我认为是常见用例的情况来说,这似乎相当痛苦。

关于c# - 为什么在所有初始订阅者断开连接后 RefCount 不工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35692770/

相关文章:

运行时VS2005应用程序中的C#日期格式错误

c# - 磁盘簇/ block 大小与二进制文件读取有何关系?

c# - 如何使用响应式(Reactive)扩展将同步轮询数据库查询转换为异步推送?

c# - 单元测试带有延迟计时器的响应式(Reactive)扩展方法

c# - gridview 控件上的自定义分页

c# - 50GB HttpRuntime.Cache 持久化可能吗?

c# - NetUseAdd 返回错误 67 或 87

.net - Web 服务还是 DLL?

c# - Foo.cmd 不会输出进程中的行(在网站上)

c# - 制作一个使用 async/await 的 IObservable<T> 以原始顺序返回已完成的任务