目前我正在使用 Observable.retry(n) 以便在出现错误时重试。请看一下这段代码:
@Test
public void testObservableRetry() {
Observable
.<String>defer(() -> Observable
.just(Notification.createOnNext("valid value"), Notification.createOnError(new Error("Test")))
.dematerialize())
.doOnNext(v -> System.out.println("onNext: " + v))
.doOnError(e -> System.out.println("onError: " + e.getMessage()))
.doOnSubscribe(d -> System.out.println("onSubscribe"))
.doOnDispose(() -> System.out.println("onDispose"))
.retry(2)
.take(2) // .take(4) will fail the test
.blockingLast();
}
上面的代码能够在错误后恢复,但重试次数仅限于总错误数,而不是连续错误。
例如如果将 .take(2)
替换为 .take(4)
- 测试将失败,因为总错误数将超过 4,尽管事实上在每个错误之后恢复并能够获取下一个值:
---(v1)----(error)---(v2)----(error)----(v3)-----(error)---(v4)-
我想找到一种在成功恢复后重置计数器的方法。用例是例如网络连接 - 我想在每次断开连接时进行相同的尝试次数,但在每次成功连接后我想重置计数器以允许无限的流量,除非每次都能够恢复在恒定的尝试次数内。
编辑:
在此上下文中,成功恢复意味着重试后至少收到 1 项。因此,我只想限制连续错误的数量,而不是总错误数。
最佳答案
这只是 @akarnokd 的解决方案的一点修改(只是删除 nonEmpty
标志)。 @akarnokd - 随意填写以编辑您的解决方案,我将删除此解决方案。
static <T> ObservableTransformer<T, T> retryEmpty(int count) {
return o -> {
AtomicInteger remaining = new AtomicInteger(count);
return o.doOnNext(v -> remaining.lazySet(count))
.retryWhen(err -> err.flatMap(e ->
(remaining.decrementAndGet() == 0)
? Observable.<T>error(e)
: Observable.just(1)));
};
}
关于java - RxJava - 重试 : reset retry counter on successful resume,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48874623/