rx-java - RxJava2 : How to improve parallel data downloading and caching?

标签 rx-java rx-java2

我正在努力通过 RxJava2。我想知道我的解决方案是否可以接受或者是否有任何方法可以改进。

用例

  1. 用户按下更新数据按钮
  2. 显示一个对话框 - 请稍候
  3. 并行处理多个后端调用
  4. 一旦完成其中任何一项 - 数据就会保存在本地数据库中
  5. 所有请求完成后(后端调用和持久化),对话框应关闭

当前解决方案

我有几个看起来像这样的Completables:

Completable organisationUnitCompletable = backendService.getOrganisationUnits()
    .doOnNext(data -> organisationUnitDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable locationCompletable = backendService.getLocations()
    .doOnNext(data -> locationDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable prioritiesCompletable = backendService.getPriorities()
    .doOnNext(data -> priorityDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

我通过添加到列表并使用 merge 运算符将它们打包为一个:

List<Completable> compatibles = new ArrayList<>();
compatibles.add(organisationUnitCompletable);
compatibles.add(locationCompletable);
compatibles.add(prioritiesCompletable);

Completable.merge(compatibles)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
     progressDialog.dismiss();
});

可能的改进

好的,这按预期工作。但有些事情我不太高兴。

我真的必须将 subscribeOn(Schedulers.io()) 添加到每个 Completable 中吗?没有它它就无法并行工作,但也许有更好的方法来做到这一点?

所有可完成的内容都有这些行。

    .ignoreElements()
    .subscribeOn(Schedulers.io());

有没有办法将其提取到一个方法中?我尝试过这样的事情:

private <T> Completable prepareCompletable(Function<Void, Observable<List<T>>> source, AbstractDao<T, Long> dao) {

    Completable orderTypeCompletable = source
            .doOnNext(data -> dao.saveInTx(data))
            .ignoreElements()
            .subscribeOn(Schedulers.io());
}

我将把 Observable 和 DAO 放入其中。当然不能编译。看来它需要的关于泛型的知识比我已有的知识多得多。

抱歉问了这么长的问题,很难用几句话解释整个用例。

最佳答案

Do I really have to add the subscribeOn(Schedulers.io()) to each Completable?

是的,但在 Completable.merge() 之后您不需要它。

Is there a way to extract it into one method?

public static <T> Function<Flowable<T>, Completable> applyIgnore() {
    return f -> f.ignoreElements().subscribeOn(Schedulers.io());
}

Completable locationCompletable = backendService.getLocations()
.doOnNext(data -> locationDao.saveInTx(data))
.to(applyIgnore());

关于rx-java - RxJava2 : How to improve parallel data downloading and caching?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40821288/

相关文章:

java - 每个 Java Future 在其生命周期中控制一个核心

java - 映射器函数返回了一个空值

java - 正确将 RxJava 代码转换为 Kotlin

android - RxJava 使用 zip 链接多个调用

android - Rxjava 和工作管理器链式异步调用

java - 在 RxJava2 中使用背压将 Observable 转换为 Flowable

Android MVP - RxJava 和改造 - 最佳方法

java - RxAndroid 简化一个常见的模式?

java - Android 未处理的异常

multithreading - RxJava 并行化是否打破了 Observable 契约?