java - 多个 observable 完成后做某事

标签 java android rx-java reactive-programming rx-android

我正在 Android 上使用 RXJava,并尝试将多个 API 调用链接在一起,并在两个 API 调用完成后执行某些操作。我的 API 调用看起来都与提供的代码示例类似。基本上进行API调用,在onNext中将每条记录写入数据库,并在所有记录都写入后,更新一些缓存。我想异步触发这两个调用,然后在两个调用都达到 onCompleted 后,再执行其他操作。 RX 中执行此操作的正确方法是什么?我认为我不需要 zip,因为我不需要将不同的流 bundle 在一起。我在想也许可以合并,但是我的两个 API 调用返回了不同类型的 Observable。请告诉我。谢谢。

    getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<User>() {
                @Override
                public void onCompleted() {
                   updateUserCache();
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading users", e);
                }

                @Override
                public void onNext(User user) {
                    insertUserToDB(user);
                }
            });

    getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<Location>() {
                @Override
                public void onCompleted() {
                   updateLocationCache();
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading Locations", e);
                }

                @Override
                public void onNext(Location location) {
                    insertLocationToDB(location);
                }
            });  

最佳答案

你的想法是正确的。您应该使用 zip 运算符。

您的每个函数都应该进行调用、写入数据库并执行您需要的所有操作。 Theat zip 输出功能有所不同:当调用它时,您可以确定所有 Observable 已成功完成 -> 只需完成您的 react 流。

创建一个Observable列表:

List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
        getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertUserToDB(user)
            .toList());

observableList.add(
        getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertLocationToDB(location)
            .toList());

然后zip所有Observable的:

Observable.zip(observableList, new FuncN<Object, Observable<?>>() {
    @Override
    public Observable<?> call(Object... args) {
        return Observable.empty();
    }
}).subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
        updateUserCache();
        updateLocationCache();
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object o) {

    }
});

这是伪代码,但我希望你能理解这个想法。

关于java - 多个 observable 完成后做某事,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40167433/

相关文章:

java - 支柱错误: The requested resource is not available

java - 错误帮助 java.net.SocketException : Connection reset

java - 文本文件内容更改事件

android - 呈现不规则按钮形状 - 最佳实践

android - 它需要指定 NormalizationOptions 元数据来预处理输入图像

java - 按需执行热 Observable

java - controlfx 对话框用法将对话框包含在 catch block 中

java - 安卓错误 : Cannot perform this operation because the connection pool has been closed

java - 多次订阅 RxJava observable

mysql - Reactive Streams Specification 1.0 发布后,jdbc 规范是否也会响应式?