我需要通过针对某些 Web 服务检查条目来过滤由 observable 发出的条目。普通的 observable.filter 运算符在这里不适用,因为它期望谓词函数同步返回判定,但在这种情况下,只能异步检索判定。
我可以通过以下代码进行转换,但我想知道是否有一些更好的运算符可以用于这种情况。
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
最佳答案
以下是您如何使用现有运算符实现此类运算符。您需要考虑一个问题。因为您的过滤操作是异步的,所以新项目到达的速度可能比您的过滤操作处理它们的速度更快。在这种情况下应该怎么办?你想按顺序运行过滤器并保证你的项目顺序得到维护吗?您想要并行运行过滤器并接受您的项目可能以不同的顺序出现吗?
这是运算符的两个版本
// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
用法:
var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
关于javascript - RxJs 中是否有 "async"版本的过滤器运算符?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28490700/