java - RxJava 2 - 如何在错误时取消无限流并处理它?

标签 java rx-java rx-java2

我有以下无限流,每秒都会执行一些操作。
我想要的是在出现错误时停止流并处理它。
我怎样才能做到这一点?

void doSomething() {
        Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
                (throwable -> {
            Timber.e(throwable, "error happened");// Never triggered
        })
                .doOnNext(someClass -> Timber.i("doing the infinite stuff"))
                .subscribe(Functions.emptyConsumer(), throwable -> {
                    Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
                });
    }

    Observable<SomeClass> doSomethingInner() {
        return Observable.error(new Exception("something went wrong"));
    }

    Observable<SomeClass> execute(Observable<SomeClass> source,
                                  long delayInterval,
                                  TimeUnit timeUnit,
                                  Scheduler scheduler,
                                  Function<SomeClass, Long> interval) {
        return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
            long currentInterval = delayInterval;

            @Override
            public ObservableSource<SomeClass> call() {
                return Single.timer(currentInterval, timeUnit, scheduler)
                        .flatMapObservable(o -> source)
                        .doOnNext(t -> currentInterval = interval.apply(t));
            }
        })
                .repeat()
                .retry();
    }

最佳答案

我认为retry()正在消耗你的错误。
尝试以下任一操作:

  • 删除此 retry()完全
  • 或将其更改为 retry(Predicate<Throwable>)决定是否重复。

如果您不提前使用流,订阅者的默认行为是在错误时取消流,并且您应该收到 onError() 的回调。里面subscribe() .

关于java - RxJava 2 - 如何在错误时取消无限流并处理它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51187121/

相关文章:

java - 在 intellij : how can i see the all thrown exceptions? 中调试多线程

java - 在java中从客户端发送到服务器以及从服务器发送到客户端

android - Observer.onError 触发不一致

java - RxJava线程切换

mvvm - RxJava - 主题中的错误处理

rx-java - 如何从 GroupedObservable 中获取值(value)?

java - RxJava 2.0 - 如何结合 Observable 和 Completable

java - 字符串数组在项目文本后放置一个逗号

java - 如何获取或创建新的 map 条目

java - 如何计数并阻止在另一个线程上执行的 Observable?