在 RxJS 版本 5 中,以下代码会导致进程在两个订阅的三次迭代后终止:
var Rx = require("rxjs");
const published$ = Rx.Observable.interval(1000).publish();
published$.subscribe(index => {
console.log(`One: ${index}`);
if (index == 3) throw new Error("ded.");
});
published$.forEach(index => {
console.log(`Two: ${index}`);
});
published$.connect();
但是,我的理解是,在下一个处理程序中抛出的错误只会取消订阅该特定订阅,而不会导致底层可观察对象终止。我的预期输出是“一”订阅将取消订阅,但该时间间隔将继续为“二”订阅产生结果。
这种行为给我带来了问题,我可能对底层热可观察对象有多个订阅 - 但任何这些订阅上抛出的单个异常都会导致底层可观察对象完全终止。
当我使用热模块重新加载进行开发时,这尤其烦人,因为任何订阅中的任何编程错误都会导致我必须刷新整个页面才能重新启动可观察序列。
有没有一种方法,无需将我的每个订阅包装在 try/catch 中,就可以在下一个处理程序中抛出异常,以简单地取消订阅该 ONE 订阅,而不终止底层的可观察对象?
------------ 编辑 ------------
通过在“subscribe”返回的订阅对象上将syncErrorThrowable 设置为true,我找到了我正在寻找的行为。似乎在代码库中唯一一次将其设置为 true 是通过“do”运算符。
我应该利用这个领域吗?我觉得使用它很肮脏,但另一方面,我发现奇怪的是“do”运算符与“next”订阅处理程序具有不同的错误处理语义。
这是受此标志影响的主要代码块: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L132
如果它设置为 false,则调用此方法: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L179
如果它设置为 true,则使用此方法: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L188
区别在于,第一个方法会将异常重新抛出到调用堆栈中,而第二个方法会将错误传播到后续订阅。
为什么 do 运算符向前传播错误,而“下一个”处理程序则将错误向上冒泡?这对我来说似乎很奇怪。
最佳答案
不,不要使用该字段。如果您将其更改回 true,您的订阅将开始吞咽错误。
这是一些私有(private)状态,我们用它来知道订阅是同步通知(与源 Observable 的订阅调用在同一 block 中)还是异步通知。如果在同步通知期间订阅者的消息处理程序之一抛出错误,我们将推迟重新抛出该错误,直到退出 Observable 的订阅回调。[1]
如果您的处理程序抛出错误,并且您希望将其转发到订阅的 onError
处理程序,则当前的指导是将它们移至订阅上方的 do
block 中。
不,我不同意这种行为。以下是一些上下文链接:
[1]来源:我写了这段代码。
关于javascript - 处理热、共享、可观察量的 "onNext"中抛出的错误的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35740821/