我有一个数组,值的类型无关。我想做的是每 x 秒发出一个值,调用具有该值的函数,如果该函数由于某种原因失败,则在 y 秒后重试(可以是一个简单的常量,这里不需要任何增量) .
到目前为止我所拥有的
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.flatMap(dt => randomFunc(dt))
.catch(e => conosle.log(e))
.retry(5)
.subscribe();
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
return observer.error(`error`);
} else {
return observer.next();
}
});
}
这里有两个问题:
1:当randomFunc
返回错误时,似乎整个链重新开始。我只需要失败的重试即可。
2:catch
实际上从未记录任何错误,即使它似乎在出错时重试。
对于第一个问题,我尝试使用 switchMap
而不是 flatMap
,如下所示:
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.switchMap(dt => randomFunc(dt)
.catch(e => conosle.log(e))
.retry(5)
)
.subscribe();
这样看来它只重试了失败的,但仍然没有记录任何错误,我什至不确定 switchMap
这里是否好(我真的是一个 Rx 菜鸟) .
如有任何帮助,我们将不胜感激,谢谢!
最佳答案
有几件事需要注意。 retry()
运算符只是重新订阅其源,因此如果您不想再次开始整个迭代,您可以将异步函数合并/连接到链中。
Rx.Observable.from(arr)
.concatMap(val => {
let attempts = 0;
return Rx.Observable.of(val)
.delay(500)
.concatMap(val => randomFunc(val)
.catch((err, caught) => {
console.log('log error');
if (attempts++ === 1) {
return Rx.Observable.of(err);
} else {
return caught;
}
})
);
})
.subscribe(val => console.log(val));
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
observer.error(`error received ${dt}`);
} else {
observer.next(dt);
observer.complete();
}
});
}
查看现场演示:https://jsbin.com/qacamab/7/edit?js,console
这会打印到控制台:
1
2
3
4
log error
log error
error received random
6
7
8
9
10
catch()
运算符是最重要的部分。它的选择器函数有两个参数:
err
- 发生的错误caught
- 原始的 Observable。
如果我们从选择器函数返回caught
,我们将重新订阅源Observable(与retry(1)
相同)。由于您想要记录每个错误消息,因此我们必须使用 catch()
而不仅仅是 retry()
。通过返回 Rx.Observable.of(err) ,我们进一步传播错误,订阅者将收到下一个通知。我们也可以只返回 Observable.empty() 来简单地忽略错误。
关于javascript - RxJS 每 x 秒从数组发出值,使用该值调用函数,如果失败则重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42329997/