java - RxJava : Conditionally catch error and stop propagation

标签 java error-handling rx-java observable retrofit2

我将 Retrofit 与 RxJava Observables 和 lambda 表达式结合使用。我是 RxJava 的新手,不知道如何执行以下操作:

Observable<ResponseBody> res = api.getXyz();
res.subscribe(response -> {
    // I don't need the response here
}, error -> {
    // I might be able to handle an error here. If so, it shall not go to the second error handler.
});
res.subscribe(response -> {
    // This is where I want to process the response
}, error -> {
    // This error handler shall only be invoked if the first error handler was not able to handle the error.
});

我查看了 error handling operators ,但我不明白他们如何帮助我处理我的用例。

最佳答案

方法一:保留两个Subscriber但是cache Observable .

一切保持原样,只是把第一行改成:

Observable<ResponseBody> res = api.getXyz().cache();

cache将确保请求仅发送一次,但同时发送一次 Subscriber s 得到所有相同的事件。

这样你是否以及如何处理第一个Subscriber中的错误不影响什么秒Subscriber看见了。

方法 2:使用 onErrorResumeNext 捕获一些错误但转发所有其他人。

添加onErrorResumeNext给你的Observable产生这样的东西(在“内部”对象中):

Observable observable = Observable.error(new IllegalStateException())
.onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
    @Override
    public Observable<?> call(Throwable throwable) {
        if (throwable instanceof NumberFormatException) {
            System.out.println("NFE - handled");
            return Observable.empty();
        } else {
            System.out.println("Some other exception - panic!");
            return Observable.error(throwable);
        }
    }
});

并且只订阅一次(在“外部”对象中):

observable.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
        e.printStackTrace();
    }

    @Override
    public void onNext(Object o) {
        System.out.println(String.format("onNext: %s", String.valueOf(o)));
    }    

});

这样,只有在 onErrorResumeNext 中无法处理错误时才会转发错误- 如果可以的话,Subscriber只会接到 onCompleted 的电话没有别的。

onErrorResumeNext 中有副作用不过,这让我有点不舒服。 :-)

编辑:哦,如果您想更加严格,可以使用方法 3:将每个案例包装在一个新对象中。

public abstract class ResultOrError<T> {
}

public final class Result<T> extends ResultOrError<T> {
    public final T result;

    public Result(T result) {
        this.result = result;
    }
}

public final class HandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

public final class UnhandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

然后:

  • 将正确的结果包装在 Result 中(使用 map )
  • 将可处理的错误包装在 HandledError 中和
  • UnhandledError 中无法处理的错误(使用 onErrorResumeNextif 子句)
  • 处理HandledError小号(使用 doOnError )
  • 有一个Subscriber<ResultOrError<ResponseBody>> - 它会收到所有三种类型的通知 ( onNext ),但只会忽略 HandledError s 并处理其他两种类型。

关于java - RxJava : Conditionally catch error and stop propagation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35618396/

相关文章:

java - 我需要Docker容器中的任何服务器来运行Java应用程序吗?

Java 字符串索引超出范围 : 0

php - PHP异常真的比错误有用吗? (进阶)

Powershell - 添加到 $Error

java - 如何同时调用所有 `Mono<E>`

java - 需要帮助理解方法中的数组 - java

java - 使用 TreeMap(21.9- Java 简介,Liang,第 10 版)

java - ANTLR 错误 : Decision can match input using multiple alternatives

java - RxJava 2.0 - 处理 publish().refCount() 中未捕获订阅者错误的资源

android - 如何在没有丑陋的 instanceof 的情况下处理 Retrofit Rx onError 中的不同类型的错误