android - 支持 RxJava 的链式改造服务

标签 android square retrofit netflix rx-java

我在使用 retrofit 的 RxJava 支持链接可观察对象时遇到了问题。我可能误解了如何使用它,否则它可能是改造中的一个错误。希望这里有人可以帮助我了解发生了什么。 编辑:我正在为这些响应使用 MockRestAdapter - 这可能是相关的,因为我看到 RxSupport 实现略有不同。

这是一个虚假的银行应用程序。它正在尝试进行转帐,在转帐完成后,它应该执行帐户请求以更新帐户值。这基本上只是我尝试 flatMap 的借口。不幸的是,以下代码不起作用,没有订阅者收到通知:

案例 1:链接两个改造产生的可观察对象

传输服务(注意:返回改造产生的 observable):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);

账户服务(注意:返回改造产生的可观察对象):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

将两个改造产生的可观察对象链接在一起:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

案例 2:创建我自己的 observable 并与 retrofit 生产的链接

如果我忽略 Retrofit 中对“平面映射”调用的内置 Rx 支持,它会完美运行!所有订阅者都会收到通知。见下文:

新账户服务(注意:不产生可观察的):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

创建我自己的可观察对象并自己发出项目:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

任何帮助将不胜感激!

最佳答案

答案是肯定的,您应该能够从 Retrofit 链接可观察对象。 MockRestAdapter$MockRxSupport:createMockObservable 私有(private)类中似乎存在错误。将订阅者订阅到可观察对象的调度方式似乎是错误的。在 HttpExecutor 线程本身启动之后订阅可观察对象。我相信来自 Schedulers.io() 线程的原始流程在 mockHandler.invokeSync 返回的 Observable 可以订阅之前完成并取消订阅。如果您看一下 retrofit-mock 模块中的代码,希望这个解释能有所帮助。

作为当前代码的解决方法,当仅使用 retrofit-mock 时,您可以用您自己的 ImmediateExecutor 实现替换内部默认 Executor。这将至少允许在测试模拟时具有单个线程流,该线程流将由您的 Schedulers.io 提供。

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();

至于在源代码中解决问题,您还可以将 retrofit-mock 项目作为源代码包含在您的项目中,并使用以下代码修改 MockRestAdapter$MockRxSupport:createMockObservable 方法。我已经测试了您的用例,它确实解决了问题。

--- MockRestAdapter.java$MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);

            observable.subscribeOn(Schedulers.from(httpExecutor));

            //noinspection unchecked
            observable.subscribe(subscriber);

          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }

在 Retrofit 项目中创建了一个问题 here ,我们会看看他们是否接受。

关于android - 支持 RxJava 的链式改造服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24748656/

相关文章:

android - android studio 中的 ADB 不断显示相同的错误消息并且无法启动

java - 在不使用递归的情况下,如何让我的方格在彼此之后而不是在彼此之上生成?

java - 将变量应用于 Retrofit HTTP 方法注释

python - Numpy square 返回数组的错误值

javascript - 在(Phonegap 应用程序)中构建不填充 IOS 设备上的字段的方形表单构建

android - 如何在 OkHttp 3 中为特定服务设置不同的连接超时?

java - 使用 Retrofit 时,当回复有时是对象有时是数组时,如何解析 JSON 回复?

android - 如何在monodroid中获取Camerapreview数据

android - 如何更改操作栏上菜单项的位置

android - 如何将 View 转换为Drawable