java - RXJava2中几种方法的组合

标签 java android rx-java2 rx-android

事实是,我需要同时从本地数据库和服务器提取数据,同时检查与 Internet 的连接。

无需检查互联网很容易。但是当我关闭移动数据时,崩溃。

我不明白如何组合并决定这样做:

private void getCategories() {

    composite.add(getDataFromLocal(context)
            .observeOn(AndroidSchedulers.mainThread()).flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
                @Override
                public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
                    if (promoFilterResponse != null) {
                        PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
                        return combineDuplicatedCategories(promoFilterResponse);
                    } else {
                        return Observable.empty();
                    }
                }
            })
            .subscribe(new Consumer<List<FilterCategory>>() {
                @Override
                public void accept(List<FilterCategory> categories) throws Exception {
                    if (mView != null) {
                        mView.hideConnectingProgress();
                        if (categories != null && categories.size() > 0) {
                            mView.onCategoriesReceived(categories);
                        }
                    }
                }
            }));

    composite.add(InternetUtil.isConnectionAvailable().subscribe(isOnline -> {
        if (isOnline) {
            composite.add(
                    getDataFromServer(context)
                            .flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
                                @Override
                                public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
                                    if (promoFilterResponse != null) {
                                        PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
                                        return combineDuplicatedCategories(promoFilterResponse);
                                    } else {
                                        return Observable.empty();
                                    }
                                }
                            })
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(categories -> {
                                if (mView != null) {
                                    mView.hideConnectingProgress();
                                    if (categories != null && categories.size() > 0) {
                                        mView.onCategoriesReceived(categories);
                                    } else {
                                        mView.onCategoriesReceivingFailure(errorMessage[0]);
                                    }
                                }
                            }, throwable -> {
                                if (mView != null) {
                                    if (throwable instanceof HttpException) {
                                        ResponseBody body = ((HttpException) throwable).response().errorBody();

                                        if (body != null) {
                                            errorMessage[0] = body.string();
                                        }
                                    }
                                    mView.hideConnectingProgress();
                                    mView.onCategoriesReceivingFailure(errorMessage[0]);
                                }
                            }));
        } else {
            mView.hideConnectingProgress();
            mView.showOfflineMessage();
        }
    }));
} 


private Single<Boolean> checkNetwork(Context context) {
    return InternetUtil.isConnectionAvailable()
            .subscribeOn(Schedulers.io())
            .doOnSuccess(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    getDataFromServer(context);
                }
            });
}

private Observable<PromoFilterResponse> getDataFromServer(Context context) {
    return RetrofitHelper.getApiService()
            .getFilterCategories(Constants.PROMO_FILTER_CATEGORIES_URL)
            .subscribeOn(Schedulers.io())
            .retryWhen(BaseDataManager.isAuthException())
            .publish(networkResponse ->  Observable.merge(networkResponse,  getDataFromLocal(context).takeUntil(networkResponse)))
            .doOnNext(new Consumer<PromoFilterResponse>() {
                @Override
                public void accept(PromoFilterResponse promoFilterResponse) throws Exception {
                    PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    LogUtil.e("ERROR", throwable.getMessage());
                }
            });

}

private Observable<PromoFilterResponse> getDataFromLocal(Context context) {
    PromoFilterResponse response = PreferencesHelper.getObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, PromoFilterResponse.class);
    if (response != null) {
        return Observable.just(response)
                .subscribeOn(Schedulers.io());
    } else {
        return Observable.empty();
    }
}

可以看到,分别连接本地数据库,同时上网查询和从服务器上传数据。

但我觉得不太对。此外,订阅者是重复的等等。

看了很多教程,里面都有介绍本地数据库和API的结合,但是没看到同时处理和外网的连接错误。

我想很多人都遇到过这样的问题,你是如何解决的?

最佳答案

假设您有两个 Obsevable:一个来自服务器,另一个来自数据库

您可以将它们合并为一个流,如下所示:

  public Observable<Joke> getAllJokes() {

    Observable<Joke> remote = mRepository.getAllJokes()
            .subscribeOn(Schedulers.io());


    Observable<Joke> local = mRepository.getAllJokes().subscribeOn(Schedulers.io());

      return Observable.mergeDelayError(local, remote).filter(joke -> joke != null);
}

关于java - RXJava2中几种方法的组合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57272391/

相关文章:

java - 如何通过Java检查打印机是否连接到您的PC?

android - Firebase 突然停止工作,使用匿名身份验证

android - 使用 rxjava 进行 fragment 间通信

java - 在 RxJava2 中正确使用 Single 和控制流

rx-java2 - rxjava2 - 如何压缩也许可以是空的?

java - 为什么map被调用多次?

java - java中的进程同步

java - 如何使用 Pattern.matches 和正则表达式来过滤字符串中不需要的字符

android - WebGL 会是适用于 Android 平台的良好代码原型(prototype)吗?

java - 如何在迭代时删除和添加元素到 TreeMap?