android - 使用回调/监听器链接 RxJava 可观察对象

标签 android reactive-programming rx-java observable rx-android

我正在将 Retrofit 与 Observables 结合使用,并希望链接这些 observables。通常它与 map()flatMap() 等函数配合使用效果很好,因为 api 会返回一个执行任务的 Observable。但在这种情况下,我必须执行以下操作:

  1. getKey() 来自 api
  2. 在另一个库 Foo 中使用返回的 key 并等待调用回调。
  3. 当回调返回时,将结果发送到api

我希望这是一个单链调用,这样我只需订阅一次。我猜我可以使用 merge()join() 或其他东西,但不确定处理回调的最佳方法是什么。

有没有办法让它变得更好?这是我目前所拥有的:

api.getKey().subscribe(new Action1<String>() {
   @Override
   public void call(String key) {
      Foo foo = new Foo();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            api.sendAwesome(awesome)
                    .subscribe(new Action1<Void>() {
                       @Override
                       public void call(Void aVoid) {
                           handleAwesomeSent();
                       }
                    });
         }
      });
      foo.makeAwesome();
   }
});

最佳答案

采用 clemp6r 的解决方案,这是另一个既不需要 Subjects 也不需要嵌套 Subscriptions 的解决方案:

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});

一般来说,我认为总是可以使用 Observable.create() 将任何基于回调的异步操作包装在 Observable 中。

关于android - 使用回调/监听器链接 RxJava 可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29679801/

相关文章:

android - 如何在 Android 应用程序中结合 RxJava Single & Completable Retrofit 调用

java - 使用具有动态参数的 Mono 方法

rx-java - RxJava : execute a list of Single in parallel and get the results in a list in the same order

java - 如何断言 Completable 是否已被订阅/完成 (RxJava2)

android - 用红色替换位图中的黑色

android - 整合我的 Android 应用程序中的 Activity 数量

java - eclipse 中缺少 "Annotation processing"菜单

android - 如何使用 SimpleCursorAdapter 将数据库中的特定元素加粗

javascript - 如何对 rxjs5 进行单元测试?

android - 如何在没有丑陋的 instanceof 的情况下处理 Retrofit Rx onError 中的不同类型的错误