javascript - RxJS 每 x 秒从数组发出值,使用该值调用函数,如果失败则重试

标签 javascript rxjs rxjs5

我有一个数组,值的类型无关。我想做的是每 x 秒发出一个值,调用具有该值的函数,如果该函数由于某种原因失败,则在 y 秒后重试(可以是一个简单的常量,这里不需要任何增量) .

到目前为止我所拥有的

Rx.Observable
    .interval(500)
    .take(arr.length)
    .map(idx => arr[idx])
    .flatMap(dt => randomFunc(dt))
    .catch(e => conosle.log(e))
    .retry(5)
    .subscribe();

function randomFunc(dt) {
    return Rx.Observable.create(observer => {
        if (dt === 'random') {
            return observer.error(`error`);
        } else {
            return observer.next();
        }
    });
}

这里有两个问题:

1:当randomFunc返回错误时,似乎整个链重新开始。我只需要失败的重试即可。

2:catch实际上从未记录任何错误,即使它似乎在出错时重试。

对于第一个问题,我尝试使用 switchMap 而不是 flatMap,如下所示:

Rx.Observable
    .interval(500)
    .take(arr.length)
    .map(idx => arr[idx])
    .switchMap(dt => randomFunc(dt)
        .catch(e => conosle.log(e))
        .retry(5)
    )
    .subscribe();

这样看来它只重试了失败的,但仍然没有记录任何错误,我什至不确定 switchMap 这里是否好(我真的是一个 Rx 菜鸟) .

如有任何帮助,我们将不胜感激,谢谢!

最佳答案

有几件事需要注意。 retry() 运算符只是重新订阅其源,因此如果您不想再次开始整个迭代,您可以将异步函数合并/连接到链中。

Rx.Observable.from(arr)
  .concatMap(val => {
    let attempts = 0;

    return Rx.Observable.of(val)
      .delay(500)
      .concatMap(val => randomFunc(val)
        .catch((err, caught) => {
          console.log('log error');
          if (attempts++ === 1) {
            return Rx.Observable.of(err);
          } else {
            return caught;
          }
        })
      );

  })
  .subscribe(val => console.log(val));

function randomFunc(dt) {
  return Rx.Observable.create(observer => {
    if (dt === 'random') {
      observer.error(`error received ${dt}`);
    } else {
      observer.next(dt);
      observer.complete();
    }
  });
}

查看现场演示:https://jsbin.com/qacamab/7/edit?js,console

这会打印到控制台:

1
2
3
4
log error
log error
error received random
6
7
8
9
10

catch() 运算符是最重要的部分。它的选择器函数有两个参数:

  • err - 发生的错误
  • caught - 原始的 Observable。

如果我们从选择器函数返回caught,我们将重新订阅源Observable(与retry(1)相同)。由于您想要记录每个错误消息,因此我们必须使用 catch() 而不仅仅是 retry()。通过返回 Rx.Observable.of(err) ,我们进一步传播错误,订阅者将收到下一个通知。我们也可以只返回 Observable.empty() 来简单地忽略错误。

关于javascript - RxJS 每 x 秒从数组发出值,使用该值调用函数,如果失败则重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42329997/

相关文章:

Angular 2 + rxjs - 如何返回通过多个后续 http 请求获取的对象流

typescript - 尝试阻止订阅两次时出现 ObjectUnsubscribedError

javascript - 停止滚动条增长

javascript - 将下拉列表“选项值”输入javascript函数

javascript - Node 异步 - 工作异步方法在 async.parallel 内返回未定义

node.js - 响应拦截器的 CatchError 函数执行多次 - Angular 6

javascript - 从 Node.js 中的 JSON 数组中读取一个值

javascript - 将 Array 序列化为元素,使用 RxJS 进行转换,并将元素组装回数组

angular - 我怎样才能让 typescript 相信可以在 Subject 上调用 Observable 运算符?

javascript - 从一系列 promise 中获取值