从消费者端取消,可能会使用 takeUntil 调用,但这不一定非常动态。但是,在这种情况下,我希望从等式的生产者一侧取消 Observable,就像您可能希望取消 promise 链中的 Promise 一样(这对于 native 实用程序不太可能)。
假设我从一个方法返回了这个 Observable。 (这个 Queue 库是一个简单的持久化队列,可以读/写文本文件,我们需要锁定读/写,这样不会有任何损坏)。
Queue.prototype.readUnique = function () {
var ret = null;
var lockAcquired = false;
return this.obsEnqueue
.flatMap(() => acquireLock(this))
.flatMap(() => {
lockAcquired = true;
return removeOneLine(this)
})
.flatMap(val => {
ret = val; // this is not very good
return releaseLock(this);
})
.map(() => {
return JSON.parse(ret);
})
.catch(e => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return genericObservable();
}
});
};
我有两个不同的问题-
.flatMap(() => acquireLock(this))
.flatMap(() => {
lockAcquired = true;
return removeOneLine(this)
})
.flatMap(val => {
ret = val;
return releaseLock(this);
})
.map(() => {
return JSON.parse(ret);
})
我正在做的是在方法的顶部存储对 ret 的引用,然后在一步之后再次引用它。我正在寻找的是一种将 removeOneLine() 触发的值传递给 JSON.parse() 的方法,而不必在链外设置一些状态(这只是不雅)。
最佳答案
1) 这取决于你的方法 acquireLock
有效 - 但我假设它在无法获取锁的情况下会抛出错误,在这种情况下,您可以使用 catch
创建流。并将回退流设置为空流:
return Rx.Observable.catch(
removeLine$,
Rx.Observable.empty()
);
2) 为了避免有状态的外部变量,你可以简单地链接一个
mapTo
:let removeLine$ = acquireLock(this)
.flatMap(() => this.obsEnqueue
.flatMap(() => removeOneLine(this))
.flatMap(val => releaseLock(this).mapTo(val))
.map(val => JSON.parse(val))
.catch(() => releaseLock(this))
);
关于rxjs - 从生产者端取消 Observable,而不是消费者端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41307359/