reactive-programming - 捕获可观察量之间的循环依赖

标签 reactive-programming rxjs reactive-extensions-js

我有一个用户编程场景,用户最终可以创建两个相互依赖的可观察量。 RxJS 不允许循环依赖,据我所知,内存或堆栈达到其限制,并使用值 true 触发 onError 回调。

如何显式检测循环依赖并抛出更具描述性的错误消息?

此代码说明了如何在 RxJS 中创建循环依赖:

var obsA,
    obsB;

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(obsB, function (a, b) {
        return a + b;
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(obsA, function (b, a) {
        return b + a;
    });


obsA
    .subscribe(function (val) {
        console.log('onNext:' + val);
    },
    function (err) {
        console.error('onError: ' + err);
    },
    function () {
        console.log('onCompleted');
    });

错误消息只是true

最佳答案

原始问题中的代码不会创建循环依赖项。当您定义 ObsA 时, ObsBundefined所以你真正做的就是调用 combineLatest(undefined, function ...) 。所以您看到的错误是因为您正在传递 undefinedcombinedLatest() .

创建真正的循环依赖实际上需要一些努力。如果您使用defer ,那么你就会有一个真正的循环依赖:

var obsA,
    obsB,
    aRef,
    bRef;

aRef = Rx.Observable.defer(function () {
    return obsA;
});

bRef = Rx.Observable.defer(function () {
    return obsB;
});

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(bRef, function (a, b) {
            return a + b;
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(aRef, function (b, a) {
        return b + a;
    });

obsA.subscribe();
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

现在这是一个真正的循环依赖。不幸的是,尽管堆栈跟踪更深,但您仍然遇到相同的错误:

RangeError: Maximum call stack size exceeded.
/* ... stack ... */

没有万无一失的方法来检测周期。您可以将可观察量包装在新的可观察量中,并检测对订阅方法的递归调用。但如果底层可观察量使用subscribeOn,这样的算法就会失败。或publishconcat任何其他延迟实际循环订阅的事情。

我最好的建议是附加 catch检查范围错误并将其替换为更好的错误的子句:

var obsA,
    obsB,
    aRef,
    bRef;

aRef = Rx.Observable.defer(function () {
    return obsA;
});

bRef = Rx.Observable.defer(function () {
    return obsB;
});

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(bRef, function (a, b) {
            return a + b;
    })
    .catch(function (e) {
        var isStackError = e instanceof RangeError && e.message === 'Maximum call stack size exceeded';
    
        return Rx.Observable.throw(isStackError ? new Error('Invalid, possibly circular observables.') : e);
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(aRef, function (b, a) {
        return b + a;
    })
    .catch(function (e) {
        var isStackError = e instanceof RangeError && e.message === 'Maximum call stack size exceeded';
    
        return Rx.Observable.throw(isStackError ? new Error('Invalid, possibly circular observables.') : e);
    });

obsA.subscribe();
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

关于reactive-programming - 捕获可观察量之间的循环依赖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17384879/

相关文章:

javascript - 如何清除 .zip 队列?

rxjs - 如何同步 RxJS 更新,以便中间值不会通过流传递?

javascript - Observable.flatMap最新: Cancel at a deeper level?

c# - 我如何使用 Reactive Extensions 将 IObservable<T>.Throttle() 与其他一些事件源结​​合起来?

javascript - RxJs:如何只维护最新值直到内部可观察完成

javascript - 缓冲/忽略事件,直到不活动期间,然后仅触发最后一个事件

javascript - 使用 RxJs 将分页请求转换为 Observable 流

mvvm - react 性扩展(Rx)+ MVVM =?

facebook - 网络关闭时如何处理 React-Native 中的网络故障

node.js - rxjs、 Node 、订阅处理程序中的内存泄漏