android - 链完成同步和异步

标签 android rx-java

我有一个可完成列表,默认情况下,我使用 concat/andThen 运算符一个接一个地运行它们。 有时我希望部分可完成项并行运行,并在所有内容完成后继续执行列表中的下一个可完成项。 我试图用这段代码实现这一点:

    var completable =
            getAsyncCompletables()?.let {
                it
            } ?: run {
                completables.removeAt(0).getCompletable()
            }
        while (completables.isNotEmpty()) {
            val nextCompletable = getAsyncCompletables()?.let {
                it
            } ?: run {
                completables.removeAt(0).getCompletable()
            }
            completable = nextCompletable.startWith(completable)
        }
        completable
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe()

我使用此代码来检测异步可完成项:

 private fun getAsyncCompletables(): Completable? {
    if (completables.size < 2 || !completables[1].async) {
        return null
    }
    var completable = completables.removeAt(0).getCompletable()
    while (completables.isNotEmpty() && completables[0].async) {
        completable = completable.mergeWith(completables.removeAt(0).getCompletable())
    }
    return completable
}

一切正常,除了一件事,尽管我使用了“startWith”,但最后一个可完成的未触发。 我也尝试了“concatWith”和“andThen”,但结果相同。

最佳答案

如果不查看更多代码,特别是 async 的作用以及 completables 的数据结构是什么,则很难回答。但是,无论这些值如何,您正在寻找的答案很可能是相似的。您可能希望使用 Completable.merge(...)Completable.mergeArray(...)

根据文档:

 /**
  * Returns a Completable instance that subscribes to all sources at once and
  * completes only when all source Completables complete or one of them emits an error.
  * ...
  */

为了实现并行执行,您需要使用新线程调用列表/数组/集合中的每个 Completables subscribeOn。这可以通过 Schedulers.newThread() 或共享池(如 Schedulers.io())完成。

为了确定,我进行了测试。这是代码。

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    ...

    val completeOne = Completable.fromAction {
        Timber.d("Completable #1 running on ${Thread.currentThread().name}")
    }

    val completeTwo = Completable.fromAction {
        Timber.d("Completable #2 running on ${Thread.currentThread().name}")
    }

    val completeThree = Completable.fromAction {
        Timber.d("Completable #3 running on ${Thread.currentThread().name}")
    }

    val completables = listOf(completeOne, completeTwo, completeThree).map { CompletableWrapper(it) }
    val asyncCompletables = completables
        .asSequence()
        .filter { it.async }
        .map { it.getCompletable().subscribeOn(Schedulers.io()) }
        .toList()

    Completable.merge(asyncCompletables)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            Timber.i("Completed all completables")
        }, Timber::e)
}

class CompletableWrapper(
    private val completable: Completable,
    val async: Boolean = true
) {
    fun getCompletable() = completable
}

这是输出。

D/MainActivity$onCreate$completeThree: Completable #3 running on RxCachedThreadScheduler-3
D/MainActivity$onCreate$completeTwo: Completable #2 running on RxCachedThreadScheduler-2
D/MainActivity$onCreate$completeOne: Completable #1 running on RxCachedThreadScheduler-1
I/MainActivity$onCreate: Completed all completables

如您所见,它在池中的新线程上运行每个可完成项,并且仅在每个可完成项完成后才调用 completed all。

See here for the documentation on Completable.merge/mergeArray .

关于android - 链完成同步和异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53541520/

相关文章:

java - robovm 插件无法在 Android Studio intellij 和 el Capitan 10.11.1 上启动

android - 在 Android 上使用 LuaJ 从 Lua 脚本中请求其他 lua 脚本

android - 当链接也通过深度链接重定向时在浏览器中打开

android - 如何在项目中包含 RxAndroid?

retrofit - 如何使用改造和 rxjava 取消任务

android - 不同屏幕尺寸的布局,为什么选择默认布局?

android - 如何圆角按钮但保持其默认外观

error-handling - ReactiveX收集失败之前处理的元素

android - 定期调用一个可观察对象并仅在下一个成功时切换到下一个

rx-java - 如何使用 RxJava 并行调用集合中每个元素的方法