javascript - 继续使用 mergeMap 在 RxJs pipeable 中出错

标签 javascript node.js functional-programming rxjs rxjs-pipeable-operators

我正在使用 RxJs 管道和 mergeMap 运算符进行一些并行 HTTP 获取。

在第一个请求失败时(假设/urlnotexists 抛出 404 错误)它会停止所有其他请求。

我希望它继续查询所有剩余的 url,而不是为这个失败的请求调用所有剩余的 mergeMap。

我尝试使用 RxJs 中的 throwError 和 catchError 但没有成功。

索引.js

const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');

const request = {
  get: url => {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        if (url === '/urlnotexists') { return reject(new Error(url)); }
        return resolve(url);
      }, 1000);
    });
  }
};

(async function() {
  await from([
    '/urlexists',
    '/urlnotexists',
    '/urlexists2',
    '/urlexists3',
  ])
    .pipe(
      mergeMap(async url => {
        try {
          console.log('mergeMap 1:', url);
          const val = await request.get(url);
          return val;
        } catch(err) {
          console.log('err:', err.message);
          // a throw here prevent all remaining request.get() to be tried
        }
      }),
      mergeMap(async val => {
        // should not pass here if previous request.get() failed 
        console.log('mergeMap 2:', val);
        return val;
      }),
      scan((acc, val) => {
        // should not pass here if previous request.get() failed 
        acc.push(val);
        return acc;
      }, []),
    )
    .toPromise()
    .then(merged => {
      // should have merged /urlexists, /urlexists2 and /urlexists3
      // even if /urlnotexists failed
      console.log('merged:', merged);
    })
    .catch(err => {
      console.log('catched err:', err);
    });
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]

我希望发出并发 GET 请求并在最后减少一个对象中它们各自的值。

但如果发生错误,我希望他们不要打断我的管道,而是记录它们。

有什么建议吗?

最佳答案

如果您想使用 RxJS,您应该在使用 forkJoin 并发执行所有请求之前,使用 catchError 添加错误处理,并将任何其他任务添加到单个请求。

const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;

// your promise factory, unchanged (just shorter)
const request = {
  get: url => {
    return new Promise((resolve, reject) => setTimeout(
      () => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
    ));
  }
};

// a single rxjs request with error handling
const fetch$ = url => {
  console.log('before:', url);
  return from(request.get(url)).pipe(
    // add any additional operator that should be executed for each request here
    tap(val => console.log('after:', val)),
    catchError(error => {
      console.log('err:', error.message);
      return of(undefined);
    })
  );
};

// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
  .subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

关于javascript - 继续使用 mergeMap 在 RxJs pipeable 中出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58275508/

相关文章:

javascript - 使用javascript检测div外部的点击

javascript - Nodejs mongodb 动态集合名称

javascript - 函数式编程将 Monad 数组转换为数组的单个 Monad

java - 如何处理一个流畅的界面,其中每个步骤都可以是终端操作?

php - 任何关于 PHP 中的 V8JS 的文档?

javascript - 如何编写一个不可点击的链接,在点击时运行 javascript 函数?

javascript - 如何从 WinJS 返回一个值

node.js - 如何在通过 Vuejs 调用时使 Nodejs 重定向工作

node.js - npm 过时的全局包 : How to actually update a package?

haskell - 什么是 super 组合子和常量应用形式?