javascript - RxJs 中是否有 "async"版本的过滤器运算符?

标签 javascript rxjs

我需要通过针对某些 Web 服务检查条目来过滤由 observable 发出的条目。普通的 observable.filter 运算符在这里不适用,因为它期望谓词函数同步返回判定,但在这种情况下,只能异步检索判定。

我可以通过以下代码进行转换,但我想知道是否有一些更好的运算符可以用于这种情况。

someObservable.flatmap(function(entry) {
  return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
    return {
      verdict: verdict,
      entry: entry
    };
  });
}).filter(function(obj) {
  return obj.verdict === true;
}).map(function(obj) {
  return obj.entry;
});

最佳答案

以下是您如何使用现有运算符实现此类运算符。您需要考虑一个问题。因为您的过滤操作是异步的,所以新项目到达的速度可能比您的过滤操作处理它们的速度更快。在这种情况下应该怎么办?你想按顺序运行过滤器并保证你的项目顺序得到维护吗?您想要并行运行过滤器并接受您的项目可能以不同的顺序出现吗?

这是运算符的两个版本

// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
    return this.flatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
    return this.concatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

用法:

var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);

关于javascript - RxJs 中是否有 "async"版本的过滤器运算符?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28490700/

相关文章:

javascript - 使用 take 和 toPromise 消耗 hot observable 的下一个值

angular - RXJS - 多个连续的 http 请求

javascript - 在 socket.io 中使用箭头函数

javascript - 如何将值从js传递到html.erb渲染函数

javascript - 无法读取未定义的属性 'tz'- Ember 时刻时区

javascript - 如何在 AngularJS ng-repeat 中呈现包含 `new Image()` 的变量?

javascript - z-index 的问题

javascript - 从 RxJS subscribe() 函数访问声明为组件的变量

RxJS 排队相关任务

typescript - 如何获取 RxJS Subject 的先前值?