android - 如何从Retrofit、RxJava、Android中的所有请求发出多个请求和响应

标签 android retrofit2 rx-java rx-android

我有一个场景,我想为多个设备调用相同的 API,并在完成所有请求后显示结果。 我正在使用改造2。 我对 RxJava 知之甚少。我认为 zip 运算符适合此操作。因此实现如下。

ApiInterface 中的 API:

@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);

这里是调用API的方法。它在 Map 中获取设备 ID 及其主体。此方法为 map 中可用的每个设备 ID 调用 API。

public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {

    List<Observable<ResponseBody>> requests = new ArrayList<>();
    ArrayList<String> reqIdList = new ArrayList<>();

    for (Map.Entry<String, String> entry : map.entrySet()) {

        String deviceId = entry.getKey();
        String jsonBodyStr = entry.getValue();
        Gson gson = new Gson();
        JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);

        reqIdList.add(deviceId);
        requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
    }

    Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {

        @Override
        public List<ResponseBody> apply(Object[] objects) throws Exception {

            Log.e("onSubscribe", "apply : " + objects.length);
            List<ResponseBody> dataResponses = new ArrayList<>();
            for (Object o : objects) {
                dataResponses.add((ResponseBody) o);
            }
            return dataResponses;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ResponseBody>>() {

                @Override
                public void accept(List<ResponseBody> responseBodies) throws Exception {
                    Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
                    for (int i = 0; i < responseBodies.size(); i++) {
                        Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e("onSubscribe", "Throwable: " + throwable);
                }
            });
}

我想获取每个设备 ID 的响应(成功/失败)。意味着我需要响应以及调用 API 的 id。 使用 zip 运算符,如果任何 API 失败,则在 accept(Throwable throwable) 方法中收到失败消息。如果任何 API 失败,我认为 zip 运算符(operator)不会调用下一个 API。

如何获得所有请求的响应(成功或失败)?

还需要一些东西来指示响应是针对哪个请求/设备 ID(某些映射)来显示结果。

我可以使用其他运算符来代替 zip 吗?

有什么建议/帮助吗?

最佳答案

我对java有点生疏,所以我会用Kotlin写我的答案,你自己转换应该不成问题。

创建一个帮助程序类,其中包含 ResponseBody 以及 deviceId:

data class IdentifiedResponseBody(
    val deviceId: String, 
    val responseBody: ResponseBody?
)

然后:

// change the signature of your requests list to return IdentifiedResponseBody observables
val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
...
// your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
requests.add(
    apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
        .map { responseBody ->
            // map the added observable to return IdentifiedResponseBody
            IdentifiedResponseBody(deviceId, responseBody)
        }
        .onErrorReturn { error -> 
            // return an item here instead of throwing error, so that the other observables will still execute
            IdentifiedResponseBody(deviceId, null)
        }
)

最后,使用merge而不是zip:

Observable.merge(requests)
        .subscribeOn(Schedulers.io())
        // it's a question if you want to observe these on main thread, depends on context of your application
        .subscribe(
            { identifiedResponse ->
                // here you get both the deviceId and the responseBody
                Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
                if (responseBody == null || responseBody.hasError()) {
                    // request for this deviceId failed, handle it
                }
            },
            { error ->
                Log.e("onSubscribe", "Throwable: " + error)
            }
        )

参见合并:http://reactivex.io/documentation/operators/merge.html

参见zip:http://reactivex.io/documentation/operators/zip.html

您应该看到深刻的区别:zip 将您的响应合并到由映射函数定义的单个项目(即您案例中的响应列表),而 merge 发出所有回复在返回时均单独显示。如果是此处的 zip,则在所有请求完成时(且仅)返回组合结果;您可能不希望出现这种行为,就好像单个请求不会返回响应一样,您根本不会得到任何响应。

更新

Java 等效项应如下所示,但在尝试之前进行修改,因为我不确定我是否正确转换了所有内容:

requests.add(
    apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
                .map(new Function<ResponseBody, IdentifiedResponseBody>() {
                    @Override
                    public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
                        return new IdentifiedResponseBody(deviceId, responseBody);
                    }
                })
                .onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
                    @Override
                    public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
                        return new IdentifiedResponseBody(deviceId, null);
                    }
                })
        );

Observable.merge(requests)
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<IdentifiedResponseBody>() {
                           @Override
                           public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
                               // same logic as from kotlin part
                           }
                       },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e("onSubscribe", "Throwable: " + throwable);
                        }
                    });

更新2

在您提出的评论中:

Is there any way from which I can get final callback for all requests completed

这就是使用 Observable 而不是 Single/Completable 的问题,除非您显式关闭 channel 或出现错误,否则它不会完成被抛出。在理想情况下,Observable 应该用于连续发出一些数据的流,例如 Room DB 的开放 channel ,因为不知道 DB 将更改多少次。我承认,在您的情况下,似乎很难应用 Observable 之外的其他东西。不过有一个解决方法:

Observable.merge(requests)
    // emits only this much items, then close this channel
    .take(requests.size.toLong())
    // executed when the channel is closed or disposed
    .doFinally {
         // todo: finalCallback
     }
    .subscribeOn(Schedulers.io())
    .subscribe(...)

代码又是用Kotlin编写的,转换成java应该不难。请检查 Observable.take() 的作用:http://reactivex.io/documentation/operators/take.html

关于android - 如何从Retrofit、RxJava、Android中的所有请求发出多个请求和响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63500403/

相关文章:

android - 长链中的 rxjava StackOverflowError 异常

java - 如何对满足过滤条件的单声道结果​​执行操作

android - Google Play In-app billing version 3 购买的服务器端验证

android - 在构造函数中使用自己的样式扩展 RadioButton

android - RxJava + Retrofit 2 链接 API 请求

android - 如何使用 Retrofit 刷新 lambda API 生成的 token ?

android - 在 RxJava 中的 TimeoutException 上恢复 Flowable

android - 仅在 xmlns 属性上设置 KSOAP2 前缀

Android 应用程序设计 : write correct intent filter

java - Android Retrofit Post 请求的服务器超时