javascript - RxJs:在 flatMapLatest 完成后访问 flatMapLatest 之前的数据

标签 javascript rxjs reactive-extensions-js

场景:

  1. 用户使用组合成单个流的过滤器
  2. 当过滤器发生变化时,触发后端事件以获取“廉价”数据
  3. 当“廉价”数据到达时,另一个具有相同参数的请求被触发到不同的端点,返回“昂贵”数据,这些数据将用于丰富廉价数据。请求应延迟 1 秒,并且仅在用户未更改任何过滤器时才会触发(否则应等待 1 秒)

我正在努力解决 3) 没有中间变量的选项。

let filterStream = Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()


let cheapDataStream = filterStream
.flatMapLatest((filterQuery) =>
Rx.Observable.fromPromise(cheapBackendApiCall(filterQuery)))

// render cheap results
cheapDataStream
.map(result => transformForDisplay(result))
.subscribe(result => { 
  //render
  // how do i invoke expensiveApiCall() with `filterQuery` data here?
  // with a delay, and only if filterQuery has not changed?

});

最佳答案

您可以利用隐式转换来避免在任何地方显式使用 fromPromise。然后你可以使用 concat 立即返回便宜的数据,然后延迟返回昂贵+便宜的数据。通过将其嵌套在 flatMapLatest 中,如果新查询到达,流还将取消任何未决的 expensiveCalls

var filters = Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest(filters => {
  //This kicks off immediately
  var cheapPromise = cheapBackendApiCall(filters);

  //This was added in the latest version 4.1, the function is only called once it is subscribed to, 
  //if you are using earlier you will need to wrap it in a defer instead.
  var expensivePromiseFn = () => expensiveBackendApiCall(filters);

  //For join implicitly calls `fromPromise` so you can pass the same 
  // sort of arguments.
  var cheapAndExpensive = Rx.Observable.forkJoin(
                            cheapPromise, 
                            expensivePromiseFn, 
                            (cheap, expensive) => ({cheap, expensive}));

  //First return the cheap, then wait 1500 millis before subscribing 
  //which will trigger the expensive operation and join it with the result of the cheap one
  //The parent `flatMapLatest` guarantees that this cancels if a new event comes in
  return Rx.Observable.concat(cheap, cheapAndExpensive.delaySubscription(1500));
})
.subscribe(x => /*Render results*/);

关于javascript - RxJs:在 flatMapLatest 完成后访问 flatMapLatest 之前的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36152713/

相关文章:

javascript - 在 rx 中创建资源的副作用( react 性扩展)

javascript - Rxjs吞掉错误

javascript - 如何在 JavaScript 中获取当前服务器 UTC 时间

javascript - 如何删除 SELECT2 中选定的值

javascript - RxJS Interval 无延迟

angular - 如何订阅响应式(Reactive) FormGroup 并映射值变化( Angular 6)

javascript - Rx.js 与 Promise 的并发

javascript - RxJS:一个将自己的旧值作为输入的 Observable

javascript - 在 Angular 中使用 Observable 时如何将函数附加到观察者?

typescript - 加载 RxJS Typescript 类型而不从 CommonJS 模块导入?