我对 rxjava 有一个非常具体的问题或误解,希望有人可以帮助解决。
我正在运行 rxjava 2.1.5 并具有以下代码片段:
public static void main(String[] args) {
final Observable<Object> observable = Observable.create(emitter -> {
// Code ...
});
observable.subscribeOn(Schedulers.io())
.retryWhen(error -> {
System.out.println("retryWhen");
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"));
}
执行后,程序打印:
retryWhen
Process finished with exit code 0
我的问题,我不明白的是:为什么在订阅 Observable 后立即调用 retryWhen ?可观察到的东西什么也不做。
我想要的是在发射器上调用 onError 时调用 retryWhen 。我是否误解了 rx 的工作原理?
谢谢!
添加新片段:
public static void main(String[] args) throws InterruptedException {
final Observable<Object> observable = Observable.create(emitter -> {
emitter.onNext("next");
emitter.onComplete();
});
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribeOn(Schedulers.io())
.doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
.retryWhen(error -> {
System.out.println("retryWhen: " + error.toString());
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"),
() -> latch.countDown());
latch.await();
}
发射器 onNext 和complete被调用。 DoOnError 永远不会被调用。输出为:
重试时间:io.reactivex.subjects.SerializedSubject@35fb3008 订阅下一页
进程已完成,退出代码为 0
最佳答案
retryWhen
当 Observer
订阅时调用提供的函数,因此您有一个主序列以及一个发出 Throwable
的序列主序列失败。您应该在这个 Function
中获得的 Observable
上编写一个逻辑,这样最后,一个 Throwable
将在另一端产生一个值.
Observable.error(new IOException())
.retryWhen(e -> {
System.out.println("Setting up retryWhen");
int[] count = { 0 };
return e
.takeWhile(v -> ++count[0] < 3)
.doOnNext(v -> { System.out.println("Retrying"); });
})
.subscribe(System.out::println, Throwable::printStackTrace);
由于 e -> { }
函数体是针对每个单独订阅者执行的,因此您可以安全地拥有每个订阅者状态,例如重试计数器。
使用 e -> e.retry()
没有任何效果,因为输入错误流永远不会调用其 onError
。
关于java - Rxjava重试时立即调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46926208/