rxjs - 从生产者端取消 Observable,而不是消费者端

标签 rxjs reactive-programming observable

从消费者端取消,可能会使用 takeUntil 调用,但这不一定非常动态。但是,在这种情况下,我希望从等式的生产者一侧取消 Observable,就像您可能希望取消 promise 链中的 Promise 一样(这对于 native 实用程序不太可能)。

假设我从一个方法返回了这个 Observable。 (这个 Queue 库是一个简单的持久化队列,可以读/写文本文件,我们需要锁定读/写,这样不会有任何损坏)。

Queue.prototype.readUnique = function () {

    var ret = null;
    var lockAcquired = false;

    return this.obsEnqueue
        .flatMap(() => acquireLock(this))
        .flatMap(() => {
            lockAcquired = true;
            return removeOneLine(this)
        })
        .flatMap(val => {
            ret = val;   // this is not very good
            return releaseLock(this);
        })
        .map(() => {
            return JSON.parse(ret);
        })
        .catch(e => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return genericObservable();
            }
        });

};

我有两个不同的问题-
  • 如果我无法获得锁,我该如何“取消”observable,只发回一个没有结果的空 Observable?我真的必须在每个返回调用中执行 if/else 逻辑来决定当前链是否被取消,如果是,则返回一个空的 Observable 吗?空,我的意思是一个 Observable,它简单地触发 onNext/onComplete,没有任何错误的可能性,也没有 onNext 的任何值。从技术上讲,我不认为那是一个空的 Observable,所以我正在寻找它的真正名称,如果它存在的话。
  • 如果您查看此特定代码序列:
    .flatMap(() => acquireLock(this))
    .flatMap(() => {
        lockAcquired = true;
        return removeOneLine(this)
    })
    .flatMap(val => {
        ret = val;
        return releaseLock(this);
    })
    .map(() => {
        return JSON.parse(ret);
    })
    

  • 我正在做的是在方法的顶部存储对 ret 的引用,然后在一步之后再次引用它。我正在寻找的是一种将 removeOneLine() 触发的值传递给 JSON.parse() 的方法,而不必在链外设置一些状态(这只是不雅)。

    最佳答案

    1) 这取决于你的方法 acquireLock有效 - 但我假设它在无法获取锁的情况下会抛出错误,在这种情况下,您可以使用 catch 创建流。并将回退流设置为空流:

    return Rx.Observable.catch(
            removeLine$,
            Rx.Observable.empty()
        );
    

    2) 为了避免有状态的外部变量,你可以简单地链接一个 mapTo :
    let removeLine$ = acquireLock(this)
        .flatMap(() => this.obsEnqueue
            .flatMap(() => removeOneLine(this))
            .flatMap(val => releaseLock(this).mapTo(val))
            .map(val => JSON.parse(val))
            .catch(() => releaseLock(this))
        );
    

    关于rxjs - 从生产者端取消 Observable,而不是消费者端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41307359/

    相关文章:

    javascript - 在 VueJS 中如何判断某个元素是否已重新渲染?

    ios - 在 ReactiveCocoa 中链接依赖信号

    ios - 为什么 RACCommand 的 block 返回信号?

    javascript - typescript 错误 : Property '0' is missing in type

    Angular:如何在随后的 Observable 中使用 `.subscribe()` 的结果

    swift - 如何通过访问 RxSwift 中的最新值来模拟计算属性?

    angular - 如何停止 rxjs 间隔/计时器

    Angular 2 变化检测与 observables

    Angular 6 ng lint combineLatest 已弃用

    node.js - Nest JS - 为返回 Observable Axios Response 的函数编写 Jest 测试用例时出现问题