android - 如何在 Android 应用程序中结合 RxJava Single & Completable Retrofit 调用

标签 android retrofit2 reactive-programming rx-java2

我当前的 Android 应用程序使用 RetrofitRxJava编排我的网络电话。

我已将我的 HTTP GET 建模为 Single<Response<String>>和 POST(s) 作为 Completable .

我需要的调用顺序如下:-

依次调用GET(1)、GET(2)、GET(3)

并行调用 POST(1), POST(2)

当 POST(1) 和 POST(2) 都完成 OK 时,调用 GET(4)。

我有一个部分解决方案。我已经对前三个 GET 的调用进行了编码 随后是 POST 调用

我的代码类似于:-

Single.concat(getRequests())
                .subscribeOn(Schedulers.single())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        manageExecutions(combineExecutions());
                    }
                })
                .subscribe();

    /**
     * @return
     */
    private static Iterable<Single<Response<String>>> getRequests() {
        final API_CALL_GET[] apiCalls = API_CALL_GET.values();
        final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_GET apiCall : apiCalls) {
            requests.add(apiCall.request());
        }

        return requests;
    }

public enum API_CALL_GET {

    GET_ONE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getOne(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataOne)
                .doOnError(error -> ever(error));
        }
    }, GET_TWO {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getTwo(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataTwo)
                .doOnError(error -> ever(error));
        }
    },
    GET_THREE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getThree(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataThree)
                .doOnError(error -> ever(error));
        }
    };

    public abstract Single<Response<String>> request();

}


    private static Action manageExecutions(final List<Completable> completables) {

        return new Action() {
            @Override
            public void run() throws Exception {
                Completable
                .concat(completables)
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        accumulateAmounts();
                    }
                })
                .subscribe();
            }
        };
    }


    /**
     * @return
     */
    private static List<Completable> combineExecutions() {
        final API_CALL_POST[] apiCalls = API_CALL_POST.values();
        final List<Completable> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_POST apiCall : apiCalls) {
            requests.addAll(apiCall.requests());
        }

        return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
    }

public enum API_CALL_POST {

    POST_ONE {
        @Override
        public List<Completable> requests() {
            return NetworkController.postRecommenderExecutions();
        }
    },
    POST_TWO {
        @Override
        public List<Completable> requests() {
            return NetworkController.postSavedSearcheExecutions();
        }
    };

    public abstract List<Completable> requests();

}


    public static List<Completable> postONE() {
        final List<Completable> completables = new ArrayList<>();

        final List<OneDO> oneDOS = fetchOnes();

        for (final OneDO oneDO : oneDOS) {
            completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }




    public static List<Completable> postTWO() {
        final List<Completable> completables = new ArrayList<>();

        final List<TwoDO> twoDOS = fetchTwos();

        for (final TwoDO twoDO : twoDOS) {
            completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }

我遇到的困难是正确链接我的调用

例如我以为我能够开发一个类似于这个伪代码的解决方案

Single.concat(GET_1...GET_N).onComplete(POST_1...POST_N).onComplete(GET_LAST)

然而,我当前的部分解决方案仅调用第一组 GET(s),然后调用 POST(s),并且 GET 和 POST 调用不是“链接的”

我看不出如何创建支持我的用例的调用链。

是否可以合并 Single -> Completable -> Single在链式调用中?

更新

根据 Daniil 的回答,我最终得到了这个解决方案:-

 Single.concat(getRequests())
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
                .ignoreElements()
                .andThen(Completable.merge(combineExecutions()))
                .doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
                .doOnComplete(() -> Controller.accumulateTotals())
                .subscribe();

最佳答案

在 kotlin 中它看起来像这样:

fun generateGetRequests(): List<Single<Response<String>>> {
    return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}

fun generatePostRequests(): List<Completable> {
    return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}

fun doSomethingWithResponses(responses: Array<Any>) {
    // Do Something, like save to db
}

fun runRequests() {
    Single.zip(generateGetRequests(), { responses ->
        doSomethingWithResponses(responses)
    }).ignoreElements()
        .andThen(Completable.merge(generatePostRequests()))
        .subscribeOn(Schedulers.io())
        .subscribe()
}

关于android - 如何在 Android 应用程序中结合 RxJava Single & Completable Retrofit 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51781595/

相关文章:

java - 将上下文传递给计时器 android eclipse

java - 获取有关 CPU 的人类可读信息

android - retrofit2 补丁请求不响应正文

java - 使用改造发送带有一些参数的多部分(文件)

android - 如何在 `react-native`中检查Android设备是否支持手电筒?

android - 如何在 Libgdx(非 UI?)线程中运行 Intent

java - 通过 Retrofit 从 Android 向 Spring 发送图像文件

swift - RxSwift : Ambiguous reference to member 'items(cellIdentifier:cellType:)'

javascript - VueJS 响应式日期对象

android - 如何让这段用 RxKotlin 编写的代码块更干净,避免阻塞线程?