说我有一些这样的代码
Observable.concat(scores)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe({...});
由于服务器端不稳定,出现onError()
当发出 Observables 和 concat()
之一时,可能会弹出通知(例如错误代码 500)。如文档中所述,运算符将停止发出其余的 Observables。
因此,失败的 Observable 以及其余的 Observable 需要再次发出。
从我的角度来看,我尝试使用toBlocking() operator
将 Observable 序列转换为阻塞 Observable 和 forEach()
它
List<Observable> list = createListOfScores();
Observable.from(list)
.toBlocking()
.forEach(new Action1<Observable>() {
@Override
public void call(Observable observable) {
observable.subscribe(...).onErrorResumeNext(...)
}
});
会有比这更好的解决方案。希望有人能启发我。
最佳答案
如果为某些Observable
调用了onError
,另一种方法可以使用retry
方法。但是,在这种情况下,运行时没有任何错误的 Observable
必须从列表中删除,以便它们不会再次运行。这是一个示例(这里我在放弃之前重试运行任务 10
次):
@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
public class RxTest {
@Test
public void testRetryObservableConcat() throws Exception {
final List<Observable<String>> observables = new CopyOnWriteArrayList<>(getObservables());
Observable.concat(observables)
//remove the observable if it's successful
.doOnNext(result -> observables.remove(0))
.doOnError(error -> System.out.println(error.getMessage()))
//if an error is thrown then retry the task only for this Observable
.retry(10, error -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> System.out.println(result),
error -> System.out.println("onError called!"));
Thread.sleep(1000);
ShadowLooper.runUiThreadTasks();
}
private List<Observable<String>> getObservables() {
List<Observable<String>> observables = new ArrayList<>();
final Random random = new Random();
final AtomicInteger atomicInteger = new AtomicInteger(1);
for (int i = 0; i < 3; i++) {
final int id = i;
observables.add(Observable.fromCallable(() -> {
//for second Observable there's a 2/3 probability that it won't succeed
if (id == 1 && random.nextInt() % 3 != 0) {
throw new RuntimeException("Observable " + id + " failed!");
}
return "Observable #" + id + " returns " + atomicInteger.getAndIncrement();
}));
}
return observables;
}
}
输出
Observable 1 failed!
Observable 1 failed!
Observable 1 failed!
Observable #0 returns 1
Observable #1 returns 2
Observable #2 returns 3
从示例中可以看出,当第二个Observable
最终成功后,每个结果都会按顺序传递。
关于android - 在合并或连接序列中重试 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51770640/