exception - RXJS - 捕捉观察者抛出的异常

标签 exception rxjs

我是 rxjs 的新手并且正在努力处理异常。

采取以下代码

let sub = new Subject();

let observer1:Observer<String> =  {
    next : v => {
        console.log("next1-" + v);
        if(v==='fail') {
            throw new Error("fail1");
        }
    },
    error : e => console.error("error1-" + e),
    complete : () => console.log("complete1")
};
sub.subscribe(observer1);

try {
    sub.next("msg1");
    sub.next("msg2");
    sub.next("fail");
    sub.next("msg3");
} catch(e) {
    console.log("Caught:" + e);
}

console.log("That's all");

我知道在异常之后,主题基本上已经死了,而 msg3 永远不会成功。

我似乎无法弄清楚如何捕获 Observer 的 next 方法中抛出的异常。

我得到的输出是

> next1-msg1 next1-msg2 next1-fail That's all
> /Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/util/hostReportError.js:9
>     setTimeout(function () { throw err; });
>                              ^
> 
> Error: fail1
>     at Object.next (/Users/peter/playground/rxjsstuff/dist/rxjs1.js:97:19)
>     at SafeSubscriber.__tryOrUnsub (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:263:16)
>     at SafeSubscriber.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:201:22)
>     at Subscriber._next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:139:26)
>     at Subscriber.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:103:18)
>     at Subject.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subject.js:63:25)
>     at Object.<anonymous> (/Users/peter/playground/rxjsstuff/dist/rxjs1.js:107:9)
>     at Module._compile (internal/modules/cjs/loader.js:678:30)
>     at Object.Module._extensions..js (internal/modules/cjs/loader.js:689:10)
>     at Module.load (internal/modules/cjs/loader.js:589:32)

我想我在这里遗漏了一些基本的东西,但我似乎找不到什么。

我想高层次的问题是 - 当 Subject 调用 next 时,如何处理 Observer 的 next 方法中抛出的错误/异常?

欢迎所有提示!

发送

彼得

最佳答案

简短的回答是你不能也不应该。一个可观察的源如何知道它的观察者会抛出什么样的错误?

长的答案是 RxJS 中的错误处理已经改变 - 在版本 6 中变得更好。

如果你看看 next 的实现在 Subject你会看到没有错误处理:

next(value?: T) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  if (!this.isStopped) {
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].next(value);
    }
  }
}

主题只是循环遍历它的观察者和 next在各个。

然而,每个观察者都被包裹在 Subscriber 中。 .如果您查看 subscribe 的来源,你会看到一个 Subscriber通过将观察者传递给 toSubscriber 创建.

特别是,创建的订阅者是 SafeSubscriber .这就是错误处理的地方。

如果你看 next SafeSubscriber ,你会看到 __tryOrUnsub叫做:

next(value?: T): void {
  if (!this.isStopped && this._next) {
    const { _parentSubscriber } = this;
    if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
      this.__tryOrUnsub(this._next, value);
    } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
      this.unsubscribe();
    }
  }
}
__tryOrUnsub将尝试调用观察者的 next如果发生错误,它将从源取消订阅观察者。
__tryOrUnsub 捕获的任何错误将使用 hostReportError 报告- 异步抛出错误,以便调用堆栈不会展开。这样做是为了使一个观察者中发生的错误不会影响其他观察者。

如果您在示例中添加第二个观察者 - 不会抛出 - 您应该看到第二个观察者的行为与您期望的一样并收到 "msg3" .

Ben Lesh 在 recent presentation 中解释了这些变化 - 以及做出这些变化的原因.你可能想检查一下。

关于exception - RXJS - 捕捉观察者抛出的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50651319/

相关文章:

java - 在 NullPointerException 和 IllegalArgumentException 之间进行选择以在对象构造时发出错误信号

javascript - ComponentRef.destroy() 方法是否也会取消订阅组件事件发射器?

angular - RXJS- Angular : Two Observable<booleans> should return Observable<boolean>

c++ - 保证某个函数在销毁前被调用

C# HttpClient 在非成功状态代码上抛出异常

.net - 在 WCF 服务中测试异常处理

angular - NgRx router-store 在离开当前页面时发出路由参数更改

angular - RxJS forkJoin 不发出值

node.js - 如何使用 RxJS observables 实现数据库连接/选择/关闭

objective-c - Swift 与 Obj-C 异常。什么是堆栈展开?为什么 swift 不这样做?