javascript - RXJS 链依赖的 observables 顺序,但获取进度条的每个发射

标签 javascript rxjs pipe observable concatmap

我遇到了一个问题,我一直在尝试使用 RxJs 找到解决方案,但似乎找不到适合它的...

  • 我有 3 个不同的 REST 请求,它们将按顺序调用,每个请求都需要前一个的响应作为参数
  • 我想实现一个进度条,随着请求成功而增加

  • 这是我的想法:
  • 我将使用管道和concatMap()避免嵌套订阅,并在前一个请求完成后订阅每个请求。

  • 考虑这个非常简化的版本。假设每个 of代表整个 REST 成功请求(稍后将处理错误),并且我将使用 n 进行未显示的工作范围...

    const request1 = of('success 1').pipe(
      delay(500),
      tap(n => console.log('received ' + n)),
    );
    
    const request2 = (n) => of('success 2').pipe(
      delay(1000),
      tap(n => console.log('received ' + n))
    );
    
    const request3 = (n) => of('success 3').pipe(
      delay(400),
      tap(n => console.log('received ' + n))
    );
    
    request1.pipe(
      concatMap(n => request2(n).pipe(
        concatMap(n => request3(n))
      ))
    )
    

    但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道会解决这个问题。

    所以对于 concatMap() ,我可以正确链接我的依赖 REST 调用,但无法跟踪进度。

    虽然我可以通过嵌套订阅很容易地跟踪进度,但我正在努力避免这种情况并使用最佳实践方式。

    我怎样才能链接我的依赖 REST 调用,但每次调用成功时仍然能够做一些事情?

    最佳答案

    这是一个通用的解决方案,虽然不是那么简单。但它确实取得了可观察的进展,同时仍然避免了 share运算符,如果使用不正确,可能会引入意外的状态。

    const chainRequests = (firstRequestFn, ...otherRequestFns) => (
      initialParams
    ) => {
      return otherRequestFns.reduce(
        (chain, nextRequestFn) =>
          chain.pipe(op.concatMap((response) => nextRequestFn(response))),
        firstRequestFn(initialParams)
      );
    };
    
    chainRequests接受可变数量的函数并返回一个接受初始参数的函数并返回一个 concatMaps 的 observable问题中手动显示的功能。它通过将每个函数简化为恰好是可观察的累积值来实现这一点。

    请记住,如果我们知道路径,RxJS 会带领我们走出回调 hell 。
    const chainRequestsWithProgress = (...requestFns) => (initialParams) =>  {
      const progress$ = new Rx.BehaviorSubject(0);
      const wrappedFns = requestFns.map((fn, i) => (...args) =>
        fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
      );
      const chain$ = Rx.defer(() => {
        progress$.next(0);
        return chainRequests(...wrappedFns)(initialParams);
      });
      return [chain$, progress$];
    };
    
    chainRequestsWithProgress返回两个 observable - 一个最终发出最后一个响应,一个在订阅第一个 observable 时发出进度值。我们通过创建 BehaviorSubject 来做到这一点。作为我们的进度值流,并包装我们的每个请求函数以返回与通常相同的 observable,但我们也将其通过管道传输到 tap因此它可以将新的进度值推送到 BehaviorSubject .

    每次订阅第一个 observable 时,进度都会归零。

    如果你想返回一个产生进度状态和最终结果值的 observable,你可以有 chainRequestsWithProgress而是返回:
    chain$.pipe(
      op.startWith(null),
      op.combineLatest(progress$, (result, progress) => ({ result, progress }))
    )
    

    并且您将有一个可观察的对象,该对象会发出一个对象,该对象代表最终结果的进度,然后是该结果本身。深思 - 确实 progress$必须只发出数字吗?

    警告

    这假设 request observables 只发出一个值。

    关于javascript - RXJS 链依赖的 observables 顺序,但获取进度条的每个发射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59581191/

    相关文章:

    javascript - RXJS 可观察量在鼠标悬停事件后取消鼠标悬停流

    php -> fwrite 处理管道挂起 -> 为什么?

    bash - 将 mplayer 的输出写入 fifo 并读取

    javascript - 为什么这会不断触发多个警报?

    javascript - 检测对输入(onfocus)事件的关注是元素在 dom 中不存在,但使用 Vanilla (普通)javascript

    Angular SwUpdate 激活 deprication

    linux - 管道打开 '|' 和 '|-' 之间的区别(安全管道打开)

    javascript - 打印多页 html,动态编号页面和页脚

    javascript - 无法让我看似简单的 js 提交表单功能在 WordPress 上工作

    angular - 在另一个 promise 中立即更新该值后,将 from 运算符与 promise 一起使用时,Observable 获得错误的值