android - 如何在错误情况下非阻塞地组合多个 RxJava 链

标签 android rx-java

我的要求:

  • N 个 Retrofit 并行调用
  • 等待所有调用完成(成功或失败)
  • 如果 k (0<= k < N) 次调用失败,它们不应阻止其他调用。想象一下,失败的调用可以返回 null 或错误对象,而成功的调用返回 ResponseBody 对象。
  • 可选:如果 RxJava 有一种简单的方法来区分每次成功或失败的调用,那很好,如果没有,我将解析响应并自己弄清楚

我有什么:

        Observable<ResponseBody> api1Call = api1.fetchData();
        Observable<ResponseBody> api2Call = api2.fetchData();
        Observable<ResponseBody> api3Call = api3.fetchData();

        Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
            @Override
            public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
                Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
                return null;
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                Logger.e(throwable, "some error with one of the apis?");
                return Observable.empty();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onCompleted() {
                        Logger.i("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Logger.e(e, "onError");
                    }

                    @Override
                    public void onNext(Object o) {
                        Logger.i("onNext " + o);
                    }
                });

我得到的输出:

some error with one of the apis?
// stacktrace of the error
onCompleted

我是 RxJava 的新手,非常困惑。我在 StackOverflow 上找到了一些答案,说 zip 做了类似的事情,但它离我的要求更远。我猜其中一个“组合”运算符 + 适当的异常处理将满足我的需要。到目前为止真的很难弄明白

我使用的版本:

compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'

最佳答案

  1. 您无法通过combineLastzip 实现并行,rxjava 将在我的测试中按顺序执行和发出您的项目。

  2. 如果您的任务之一失败,您的 Func2#call 将不会被调用,而是提交 onError。你甚至无法通过这种方式获得其他成功任务的结果。

  3. 解决方案是flatMap,这是rxjava中实现并发的传统方式。它还满足您的其他要求。

这是一个小而完整的例子。

我使用一个简单的网站服务来测试。

我用一个Semaphore 来等待所有任务完成,你完全可以忽略它。并且我在 http 请求中添加了日志记录以便更好地理解,您也可以完全忽略它。

public interface WebsiteService {

    @GET
    Observable<ResponseBody> website(@Url String url);

}

然后我用下面的代码用rxjava测试结果。

   HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
    loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);

    Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
            .build();
    WebsiteService websiteService = retrofit.create(WebsiteService.class);

    final Semaphore s = new Semaphore(1);
    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    Observable<ResponseBody> first = websiteService.website("http://github.com");
    Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
    Observable<ResponseBody> third = websiteService.website("http://notexisting.com");

    final int numberOfCalls = 3; // testing for three calls

    Observable.just(first, second, third)
            .flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
                @Override
                public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
                    return responseBodyObservable.subscribeOn(Schedulers.computation());
                }
            })
            .subscribeOn(Schedulers.computation())
            .subscribe(new Observer<ResponseBody>() {

                private int currentDoneCalls = 0;

                private void checkShouldReleaseSemaphore() {
                    if (currentDoneCalls >= numberOfCalls) {
                        s.release();
                    }
                }

                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull ResponseBody responseBody) {
                    System.out.println("Retrofit call success " + responseBody.contentType());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("Retrofit call failed " + e.getMessage());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete, All request success");
                    checkShouldReleaseSemaphore();
                }

            });

    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("All request done");
        s.release();
    }

我使用 rxjava2 并改造 adapter-rxjava2 进行测试。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'

已更新

RxJava2的介绍页来自github指出了实现并行的实用方法。

Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this...

虽然这个例子是基于RxJava2,但是操作flatMap是 已经存在于 RxJava 中。

关于android - 如何在错误情况下非阻塞地组合多个 RxJava 链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44643106/

相关文章:

android - Spinner 检测关闭事件

android - 尽管实现了 doOnError,但 RxAndroid 使应用程序崩溃

android - 如何使用 RXJava 调用的响应结果作为另一个 RXJava 函数内 if 语句的条件?

javascript - 如何从函数创建 Observable?

java - RxJava retryWhen 异常行为

java - 进入多窗口模式时奇怪的生命周期回调排序

链接共享 C 库时出现 Android NDK 错误

android - 即使在应用程序卸载后,Cordova LocalStorage 也会保留数据

android - 无法安装 PhoneGap 插件

java - 使用 SimpleDateFormat 在 Android 中解析长日期