考虑以下代码:
var xs = Observable.Create<Unit>(async o =>
{
await Task.Delay(10);
o.OnError(new Exception());
}).Replay().RefCount();
xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();
上面的序列不会抛出任何异常并且永远不会完成。
我做了以下观察:
- 删除第一个订阅会导致错误传播 - 在
Subscribe
上下文中抛出异常(最后一行) - 删除
.Replay().RefCount()
启用错误传播 - 在Subscribe
上下文中抛出异常(最后一行) - 删除
await Task.Delay(10)
会启用错误传播 - 在OnError
调用中抛出异常(在Create
方法中)。令人惊讶的是,切换两个Subscribe
方法会在Subscribe
上下文(最后一行)处抛出异常。
话虽如此,我想问的是以下问题是否是设计使然:
- 上述场景中的可观察序列永远不会完成
- 有时在
Create
方法中抛出异常,有时在Subscribe
上下文中抛出异常。
如果这是设计使然,您会推荐什么解决方法?在这种情况下,如何发布我的序列以便我的所有客户(观察者)都可以安全地处理异常?当前的行为似乎非常随意,尤其是对于库创建者而言。这也让调试变得非常痛苦。请指教。
最佳答案
首先回答你的问题:
- 不,这不是设计使然。这是一个错误。
- “解决方法”是不要混合使用 TPL 和 Reactive。你可以打这样有趣的东西。
以下按预期工作:
var xs = Observable.Throw<Unit>(new Exception())
.Delay(TimeSpan.FromMilliseconds(10))
.Replay()
.RefCount();
这会导致在第一次 .Subscribe
和 await xs.DefaultIfEmpty()
调用时引发异常。由于延迟,您会得到两个异常:多个线程正在运行。
至于为什么会这样,这里是一个开始:
第一个 Subscribe 代码基本上转换为以下内容。 ( See source ):
xs.Subscribe(x => Console.WriteLine(x), Stubs.Throw, Stubs.Nop);
public static class Stubs
{
public static readonly Action Nop = delegate
{
};
public static readonly Action<Exception> Throw = delegate (Exception ex)
{
var edi = ExceptionDispatchInfo.Capture(ex);
edi.Throw();
};
}
如果您在 Stubs
类中设置断点,您会看到它进入该类并尝试抛出异常。然而,异常并没有出现,很可能是由于一些奇怪的 TPL/ReplaySubject 交互。
关于c# - 发布可观察对象时异步创建挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51714221/