java - 嵌套 RxJava Observables 的正确方法?

标签 java rx-java

我正在使用 RxJava 并嵌套 Observables,如下所示。我想在另一个 observable 内部调用一个 observable,并让外部 observable 发出内部 observable onNext 的结果。它似乎有效,但我不确定这是否是正确的实现,因为我无法找到任何文档来确认。

public Observable<User> updateUser(final String id) {
    return Observable.create(new Observable.OnSubscribe<User>() {
        @Override
        public void call(final Subscriber<? super User> observer) {
                try {
                if (!observer.isUnsubscribed()) {
                    getUser(id).subscribe(new Action1<User>() {
                        @Override
                        public void call(User user) {
                            observer.onNext(user);
                                observer.onCompleted();
                            }
                        });
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
        }
    });
}

public Observable<User> getUser(final String id) {
...
}

最佳答案

当您刚刚进行 Observable 转换时,请避免调用 subscribe,因为您必须注意 @akarnokd 和 @zsxwing 在评论中提到的所有问题。

我还会避免使用 Observable.create,因为创建 OnSubscribe 实现涉及考虑背压以及随之而来的棘手的并发业务。首选 Observable.justObservable.fromObservable.rangeObservable.deferObservable.using(还有更多,请查看 wiki),为了更高级的目的,请实现 SyncOnSubscribe

此代码可能涵盖您的用例:

public Observable<User> updateUser(final String id) {
    return getUser(id).doOnNext(user -> updateUser(user));
}

public void updateUser(User user) {
    //whatever you want here
}

public Observable<User> getUser(final String id) {
    ...
}

关于java - 嵌套 RxJava Observables 的正确方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34744764/

相关文章:

java - 有什么方法可以将 JASPER 报告解析为 JSON 对象..?

Java 返回错误 "Cannot instantiate the type"

android - CompositeDisposable.clear 导致 OkHttp 抛出 java.lang.IllegalStateException : Unbalanced enter/exit

java - 检查 RxJava 中是否有订阅者抛出异常

android - 在 Android 中使用 RxJava 窗口或缓冲区进行批处理?

java - jsf primefaces,更新时显示 itemLabel 而不是 itemvalue

java - 如何在 Checkstyle 中手动插入错误?

java - Apache Tomcat 7 在每次请求时更改 JSESSIONID

java - RxJava : how to handle combineLatest() when one of the streams emits nothing

android - RxJava 1 和 RxJava 2 在同一个项目中