java - RxJava retryWhen 重新订阅传播

标签 java android java-8 retrofit rx-java

我在 Android 应用程序中使用 Retrofit 和 RxJava 进行通信,并且必须处理解析来自看似正常的 HTTP 响应(状态 200 代码)的响应时的错误。

我还实现了一种使用 retryWhen 运算符处理错误的方法,该运算符连接到用户的输入以决定是否重试。这通过重新订阅原始的 Observable 来实现。

我尝试过的第一种方法是这样的:

services.getSomething()
  .map(response -> {
    if (checkBadResponse(response)) {
      throw new RuntimeException("Error on service");
    } else {
      return parseResponse(response);
    }
  }).retryWhen(this::shouldRetry);

这样就不会再次调用服务。 retryWhen 运算符似乎无法重新订阅服务的 Observable

最终的工作是实现另一个不发送 onCompleted 的运算符并将其与 lift 一起使用,如下所示:

public class CheckResponseStatus<T> implements Observable.Operator<ResponsePayload<T>, ResponsePayload<T>> {
    @Override
    public Subscriber<? super ResponsePayload<T>> call(Subscriber<? super ResponsePayload<T>> subscriber) {
        return new Subscriber<ResponsePayload<T>>() {
            private boolean hasError = false;

            @Override
            public void onCompleted() {
                if (!hasError)
                    subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                hasError = true;
                subscriber.onError(e);
            }

            @Override
            public void onNext(ResponsePayload<T> response) {
                if (response.isOk()) {
                    subscriber.onNext(response);
                } else {
                    hasError = true;
                    subscriber.onError(new RuntimeException(response.getMessage()));
                }
            }
        };
    }
}

像这样使用它:

services.getSomething()
  .lift(new CheckResponseStatus())
  .map(response -> parseResponse(response))
  .retryWhen(this::shouldRetry);

这是处理它的正确方法还是有更简单、更好的方法?

最佳答案

这看起来像是 rx-java 实现中的错误。无论如何,从 map 函数中抛出异常是一件坏事,因为该函数应该是纯函数(例如,没有副作用)。你应该在你的情况下使用 flatMap 运算符:

services.getSomething()
  .flatMap(response -> {
    if (checkBadResponse(response)) {
      return Observable.<ResponseType>error(new RuntimeException("Error on service"));
    } else {
      return Observable.<ResponseType>just(parseResponse(response);
    }
  }).retryWhen(this::shouldRetry);

上面的代码按预期工作并且在发生错误时真正重试请求。

关于java - RxJava retryWhen 重新订阅传播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30219411/

相关文章:

java - 如何干燥内部 Activity

java - 我的字符串反向程序有问题

java - 即使预期结果正确,JUnit 测试也会失败

android - 在代码中旋转一个按钮(或里面的文字)

java - Java 8 中的非干扰示例

java - Hazelcast 中的优先级阻塞队列

android - future 的 Android 设备是否仍支持 OpenGL ES 1.x?

java - 如何使用单个 java 类在多个 View 之间切换?

java - 当键/值映射函数具有共同的计算步骤时,将 Stream 转换为 Map

java - 将防御副本转换为流仍然是线程安全的吗?