我有这门课:
public class TestService
{
public IObservable<int> GetObservable(int max)
{
var subject = new Subject<int>();
Task.Factory.StartNew(() =>
{
for (int i = 0; i < max; i++)
{
subject.OnNext(i);
}
subject.OnCompleted();
});
return subject;
}
}
我也为此写了一个测试方法:
[TestMethod]
public void TestServiceTest1()
{
var testService = new TestService();
var i = 0;
var observable = testService.GetObservable(3);
observable.Subscribe(_ =>
{
i++;
});
observable.Wait();
Assert.AreEqual(i, 3);
}
但有时我会收到错误消息:Sequence contains no elements in method Wait().
我建议我的 IObservable 在测试到达 observable.Wait() 行之前完成。 我怎样才能避免这个错误?
最佳答案
在我看来,这段代码的基本问题是 IObservable
代表如何观察某物的契约。
在这种情况下,GetObservable
方法不只是返回契约(Contract),它正在立即执行工作(使用 TPL)。如果您认为可以多次订阅同一个 IObservable
,这就没有意义了。实例(这实际上发生在示例代码中,因为您第一次使用 Subscribe
订阅,而另一次使用 Wait
订阅)。 这个区别是我学习 Rx 的最大绊脚石。
我会改为实现这样的方法(并且完全避免使用 Subject<>
,因为它不是创建 Observable 的首选方式):
public class TestService
{
public IObservable<int> GetObservable(int max)
{
return Observable.Create<int>((IObserver<int> observer) =>
{
for (int i = 0; i < max; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
});
}
}
Observable.Create
也有有趣的覆盖这提供了更大的灵 active 。
关于c# - 等待 IObservable 获取所有元素错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16155094/