考虑以下几点:
[Fact]
public void foo()
{
var result = new Subject<bool>();
var startCount = 0;
var completionCount = 0;
var obs = Observable
.Defer(() =>
{
++startCount;
return result.FirstAsync();
})
.Do(_ => ++completionCount)
.Publish()
.RefCount();
// pretend there are lots of subscribers at once
var s1 = obs.Subscribe();
var s2 = obs.Subscribe();
var s3 = obs.Subscribe();
// even so, we only expect to be started once
Assert.Equal(1, startCount);
Assert.Equal(0, completionCount);
// and we won't complete until the result ticks through
result.OnNext(true);
Assert.Equal(1, startCount);
Assert.Equal(1, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
// now try exactly the same thing again
s1 = obs.Subscribe();
s2 = obs.Subscribe();
s3 = obs.Subscribe();
// startCount is 4 here instead of the expected 2!
Assert.Equal(2, startCount);
Assert.Equal(1, completionCount);
result.OnNext(true);
Assert.Equal(2, startCount);
Assert.Equal(2, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
}
我对 Publish
+ RefCount
的理解是,只要至少有一个订阅者,就会保持与源的连接。一旦最后一个订阅者断开连接,任何 future 的订阅者都将重新启动与源的连接。
正如您在我的测试中看到的那样,第一次通过时一切都完美无缺。但是第二次,管道内的延迟可观察对象会为每个新订阅者执行一次。
我可以通过调试器看到,对于第一组订阅者,每次调用 Subscribe
时,obs._count
(对订阅者进行计数)都会增加。但对于第二组订阅者,它仍然为零。
为什么会发生这种情况,我可以做些什么来纠正我的管道?
最佳答案
@user631090 的答案很接近,但不正确,所以我想我会自己回答。
这是因为 Publish
如果它发布的流本身已完成,它将立即完成新的订阅者。您可以在图中看到 here :
但是,如果图表在底层流完成后 包含一个订阅者,那就更好了。
雪上加霜的是,Defer
仍然需要为新订阅者调用。但由于初始流已完成,它的返回值被 Publish
忽略。
我还无法想出一种方法来实现我的预期用例。我想也许使用 Multicast
而不是 Publish
,根据需要创建一个新主题。但我还没有能够做到这一点。对于我认为是常见用例的情况来说,这似乎相当痛苦。
关于c# - 为什么在所有初始订阅者断开连接后 RefCount 不工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35692770/