c# - Reactive Extensions 吞下在线程池线程上调用的 OnNext() 的异常?

标签 c# system.reactive reactive-programming

我在 .Net 4.5 中使用 Rx 2。当下面的代码运行时,它只是静默退出而不执行 OnCompleted 委托(delegate)或显示任何错误。如果我在 ToObservable 中使用 Scheduler.CurrentThread,它至少会抛出错误并终止程序,此时不执行 OnCompleted 是有意义的。但是当这在主线程之外的线程中执行时,这种行为似乎不合理且 Not Acceptable 。我错过了什么吗?

static void Main()
{
     Enumerable.Range(0, 1)
                           .ToObservable(Scheduler.Default)
                           .Subscribe(o => { throw new Exception("blah"); }, () => Console.WriteLine("completed"));

     Thread.Sleep(2000);
 }

已编辑: 是的,当作为控制台应用程序运行时,无论在哪个线程上执行观察,它总是会抛出错误。

但是,当我如下在 NUnit 中运行此代码作为测试时,它会在 2 秒(线程休眠时间)后静默退出,没有任何错误或消息(预期“已完成”)。那么问题实际上是 NUnit 造成的吗?

[TestFixture]
class Program
{
    [Test]
    public void Test()
    {
        Enumerable.Range(0, 1)
                .ToObservable(Scheduler.Default)
                .Subscribe(
                    o => { throw new Exception("blah"); }, 
                    () => Console.WriteLine("completed"));
        Thread.Sleep(2000);
    }
}

最佳答案

Rx 不会捕获观察者抛出的异常。这是一个非常重要的设计原则,之前已经详细讨论过,尽管出于某种原因,它仅作为 §6.4 的脚注包含在 Rx Design Guidelines 中。 .

Note: do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods. These calls are on the edge of the monad. Calling the OnError method from these places will lead to unexpected behavior.

从本质上讲,该准则确保从观察者的角度来看,OnError 只会被源自可观察对象本身的异常调用,包括对直接参与计算的用户代码的任何调用(而不仅仅是观察结果)。如果不是这种情况,那么观察者可能无法区分传递给 OnError 的异常是他们的 OnNext 处理程序中的错误还是可观​​察对象中的错误.

但更重要的是,它还能确保 OnNext 处理程序抛出的任何异常都不会得到处理。这样可以更轻松地调试您的程序并保护用户数据。

也就是说,当 OnNext 在池线程上执行时,您可能会观察到不同行为的原因仅仅是您调试经验的结果。尝试启用 first-chance exceptions .

此外,我还通过将 Thread.Sleep 更改为 Console.ReadKey() 来避免竞争条件。

关于c# - Reactive Extensions 吞下在线程池线程上调用的 OnNext() 的异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25825981/

相关文章:

c# - 如何访问命名空间中的函数

c# - 将大量表单数据发送到对象的最佳方式

c# - 按需提供数据,延迟加载(相当于yield return)

c# - 从多线程数据源平滑更新图表

c# - 这两种返回 JSON 对象的方式有什么区别?

c# - 这种将字符串强制为整数或默认为 0 的扩展方法是否多余?

c# - 如何延长 Rx.NET 中 Observable Timer 的持续时间?

vue.js - 如何将 RSocket 数据检索到不同的选项卡?

arrays - RxJS 限制和返回响应结果数组的最佳方式

javascript - 为什么在 Redux-Saga 上使用 Redux-Observable?