android - 在合并或连接序列中重试 Observable

标签 android rx-java

说我有一些这样的代码

Observable.concat(scores)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribeOn(Schedulers.io())
  .subscribe({...});

由于服务器端不稳定,出现onError()当发出 Observables 和 concat() 之一时,可能会弹出通知(例如错误代码 500)。如文档中所述,运算符将停止发出其余的 Observables。

因此,失败的 Observable 以及其余的 Observable 需要再次发出。

从我的角度来看,我尝试使用toBlocking() operator 将 Observable 序列转换为阻塞 Observable 和 forEach()

List<Observable> list = createListOfScores();
Observable.from(list)
  .toBlocking()
  .forEach(new Action1<Observable>() {
    @Override
    public void call(Observable observable) {
      observable.subscribe(...).onErrorResumeNext(...)
    }
  });

会有比这更好的解决方案。希望有人能启发我。

最佳答案

如果为某些Observable调用了onError,另一种方法可以使用retry方法。但是,在这种情况下,运行时没有任何错误的 Observable 必须从列表中删除,以便它们不会再次运行。这是一个示例(这里我在放弃之前重试运行任务 10 次):

@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
public class RxTest {
    @Test
    public void testRetryObservableConcat() throws Exception {
        final List<Observable<String>> observables = new CopyOnWriteArrayList<>(getObservables());
        Observable.concat(observables)
          //remove the observable if it's successful
          .doOnNext(result -> observables.remove(0))
          .doOnError(error -> System.out.println(error.getMessage()))
          //if an error is thrown then retry the task only for this Observable
          .retry(10, error -> true)
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(result -> System.out.println(result),
            error -> System.out.println("onError called!"));

        Thread.sleep(1000);

        ShadowLooper.runUiThreadTasks();
    }

    private List<Observable<String>> getObservables() {
        List<Observable<String>> observables = new ArrayList<>();
        final Random random = new Random();
        final AtomicInteger atomicInteger = new AtomicInteger(1);
        for (int i = 0; i < 3; i++) {
            final int id = i;
            observables.add(Observable.fromCallable(() -> {
                //for second Observable there's a 2/3 probability that it won't succeed
                if (id == 1 && random.nextInt() % 3 != 0) {
                    throw new RuntimeException("Observable " + id + " failed!");
                }
                return "Observable #" + id + " returns " + atomicInteger.getAndIncrement();
            }));
        }

        return observables;
    }
}

输出

Observable 1 failed!

Observable 1 failed!

Observable 1 failed!

Observable #0 returns 1

Observable #1 returns 2

Observable #2 returns 3

从示例中可以看出,当第二个Observable最终成功后,每个结果都会按顺序传递。

关于android - 在合并或连接序列中重试 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51770640/

相关文章:

android - 如何通过 Android 布局 xml 中的数据绑定(bind)更改背景?

java - Android/Java - Gson 从 JSON 字符串解析为对象 - 日期问题

Android CalendarView 检测月份变化

java - 在返回到订阅者之前,如何拦截一个可观察对象并在 RxJava 中修改它?

java - RxJava : How to extract object from observable?

android - Retrofit 2 + RxJava 将类似的请求干净地链接在一起

javascript - 当用户从列表中选择时如何显示内容详细信息?

android - 从主线程调用的 RxAndroid

android - 是否有可能在 rxjava android 订阅者的 onNext() 中获得 2 个值?

java - 在 Android ListView 适配器中存储字符串变量