java - RxJava - 重试 : reset retry counter on successful resume

标签 java rx-java rx-java2

目前我正在使用 Observable.retry(n) 以便在出现错误时重试。请看一下这段代码:

@Test
public void testObservableRetry() {
    Observable
            .<String>defer(() -> Observable
                    .just(Notification.createOnNext("valid value"), Notification.createOnError(new Error("Test")))
                    .dematerialize())
            .doOnNext(v -> System.out.println("onNext: " + v))
            .doOnError(e -> System.out.println("onError: " + e.getMessage()))
            .doOnSubscribe(d -> System.out.println("onSubscribe"))
            .doOnDispose(() -> System.out.println("onDispose"))
            .retry(2)
            .take(2) // .take(4) will fail the test
            .blockingLast();
}

上面的代码能够在错误后恢复,但重试次数仅限于总错误数,而不是连续错误。

例如如果将 .take(2) 替换为 .take(4) - 测试将失败,因为总错误数将超过 4,尽管事实上在每个错误之后恢复并能够获取下一个值:

---(v1)----(error)---(v2)----(error)----(v3)-----(error)---(v4)-

我想找到一种在成功恢复后重置计数器的方法。用例是例如网络连接 - 我想在每次断开连接时进行相同的尝试次数,但在每次成功连接后我想重置计数器以允许无限的流量,除非每次都能够恢复在恒定的尝试次数内。

编辑:

在此上下文中,成功恢复意味着重试后至少收到 1 项。因此,我只想限制连续错误的数量,而不是总错误数。

最佳答案

这只是 @akarnokd 的解决方案的一点修改(只是删除 nonEmpty 标志)。 @akarnokd - 随意填写以编辑您的解决方案,我将删除此解决方案。

static <T> ObservableTransformer<T, T> retryEmpty(int count) {
    return o -> {
        AtomicInteger remaining = new AtomicInteger(count);
        return o.doOnNext(v -> remaining.lazySet(count))
                .retryWhen(err -> err.flatMap(e ->
                                (remaining.decrementAndGet() == 0)
                                        ? Observable.<T>error(e)
                                        : Observable.just(1)));
    };
}

关于java - RxJava - 重试 : reset retry counter on successful resume,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48874623/

相关文章:

java - 在 eclipse 项目中创建一个新文件作为 IResource

java - 如何使用 Java 获取 div 标签包含的文本?

android - 当与主题反跳时多次进行改造调用

rx-java - RX Java 是服务器端工程师需要的东西吗?

java - 避免服务类之间的紧耦合

java - 如何在 Controller 中使用 @RequestParam 获取 Map<String, String[]> 数据?

android - RxAndroid消息队列超时

java - 创建 Observables 树并添加订阅者作为叶子

kotlin - livedata postvalue是否需要observeOn?

compiler-errors - 带有 RxJava Single<List<T>> 编译错误的 Android Room