在学习 Rx 的过程中,我遇到了一个关于 Observables 的经常重复的规则,它在 The Observable Contract 中有详细说明。 .
Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications.
这对我来说很有意义,因为让一个 Observable 在完成后继续产生值会让人感到困惑,但是当我在 .NET 中测试 Observable.Range 方法时,我注意到它没有表现出这种行为,事实上,许多 Observables 违反了这条规则。
var rangeObservable = Observable.Range(0, 5);
rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done first!"));
Console.ReadLine();
rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done second!"));
Console.ReadLine();
//Output:
//0
//1
//2
//3
//4
//Done first!
//0
//1
//2
//3
//4
//Done second!
很明显,rangeObservable
调用了两次 OnComplete
,并在第一次 OnComplete
之后产生了值。这让我相信这不是关于Observables 的规则,而是关于订阅 的规则。也就是说,Observable 可以根据需要生成任意数量的终止消息,甚至可以在生成之后生成值,只要每个订阅 只接收一个终止消息并且不接收任何消息之后的进一步消息。
当它说Observable 时,它们实际上是指订阅吗?它们真的是不同的东西吗?我对模型有根本性的误解吗?
最佳答案
可观察契约必须对任何被观察的可观察对象有效。 当 Observable 未被观察时是否发生任何事情都留给 observable 的实现。
它有助于在 Enumerable 中考虑模拟 - Observable 是 Enumerable 的对偶。在枚举中,你会有
range = Enumerable.Range(0, 5)
,您将使用与上述类似的范围:
range.ForEach(Console.WriteLine); //prints 0 - 4
range.ForEach(Console.WriteLine); //prints 0 - 4 again
并发现这是完全可以接受的行为,因为只有在调用 GetEnumerator
时才会创建实际的数字生成器。同样,在 Observable 中,等效的方法是 Subscribe
。
range 的实现是这样的:
static IObservable<int> Range(int start, int count)
{
return Observable.Create<int>(observer =>
{
for (int i = 0; i < count; i++)
observer.OnNext(start + i);
observer.OnCompleted();
return Disposable.Empty;
});
}
此处,每次有订阅时都会调用 observer => {...}
函数。工作在 subscribe 方法中完成。您可以很容易地看到它 (1) 为每个观察者推送相同的序列,(2) 每个观察者只完成一次。
这些仅当您观察它们时才会发生某些事情的可观察对象称为冷可观察对象。 Here's an article描述概念。
注意
Range
是一个非常简单的实现,仅用于说明目的。该方法在完成之前不会返回一次性元素 - 因此 Disposable.Empty
是可以接受的。一个正确的实现将在调度程序上运行工作,并在继续循环之前使用已检查的可释放项来查看订阅是否已被释放。
要点是手动实现可观察契约很难,这就是 Rx 库存在的原因 - 通过组合构建功能。
关于c# - Observable.Range 是否违反了 Observable 契约?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41404034/