java - Rxjava重试时立即调用

标签 java rx-java

我对 rxjava 有一个非常具体的问题或误解,希望有人可以帮助解决。

我正在运行 rxjava 2.1.5 并具有以下代码片段:

public static void main(String[] args) {

    final Observable<Object> observable = Observable.create(emitter -> {
        // Code ... 
    });

    observable.subscribeOn(Schedulers.io())
            .retryWhen(error -> {
                System.out.println("retryWhen");
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"));

}

执行后,程序打印:

retryWhen

Process finished with exit code 0

我的问题,我不明白的是:为什么在订阅 Observable 后立即调用 retryWhen ?可观察到的东西什么也不做。

我想要的是在发射器上调用 onError 时调用 retryWhen 。我是否误解了 rx 的工作原理?

谢谢!

添加新片段:

public static void main(String[] args) throws InterruptedException {

    final Observable<Object> observable = Observable.create(emitter -> {
        emitter.onNext("next");
        emitter.onComplete();
    });

    final CountDownLatch latch = new CountDownLatch(1);
    observable.subscribeOn(Schedulers.io())
            .doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
            .retryWhen(error -> {
                System.out.println("retryWhen: " + error.toString());
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"),
                         () -> latch.countDown());

    latch.await();
}

发射器 onNext 和complete被调用。 DoOnError 永远不会被调用。输出为:

重试时间:io.reactivex.subjects.SerializedSubject@35fb3008 订阅下一页

进程已完成,退出代码为 0

最佳答案

retryWhenObserver 订阅时调用提供的函数,因此您有一个主序列以及一个发出 Throwable 的序列主序列失败。您应该在这个 Function 中获得的 Observable 上编写一个逻辑,这样最后,一个 Throwable 将在另一端产生一个值.

Observable.error(new IOException())
    .retryWhen(e -> {
         System.out.println("Setting up retryWhen");
         int[] count = { 0 };
         return e
            .takeWhile(v -> ++count[0] < 3)
            .doOnNext(v -> { System.out.println("Retrying"); });
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

由于 e -> { } 函数体是针对每个单独订阅者执行的,因此您可以安全地拥有每个订阅者状态,例如重试计数器。

使用 e -> e.retry() 没有任何效果,因为输入错误流永远不会调用其 onError

关于java - Rxjava重试时立即调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46926208/

相关文章:

java - 在 Java 中使用递归泛型时不兼容的类型

java - 如何在 Java 中的 GridLayout 面板的特定单元格中添加标签?

java - 权限拒绝 : opening provider android. support.v4.content.FileProvider

java - 为什么 RxJava Observable.doOnDispose 会触发两次?

java - 如何使用 RxJava 发出错误然后发出缓存数据?

rx-java - 在 0.18 中,后台任务如何在调度程序上执行?

java - Swift 的 CharacterSet.decimalDigits 和 unicodeScalars 在 Kotlin 中的等价物是什么

java - Spring Boot - “Error creating bean with name 'entityManagerFactory'” - 启动

android - io.reactivex.exceptions.UndeliverableException : java. io.InterruptedIOException

java - 如何将异步 HTTP 请求抽象为同步请求