我有一个可完成列表,默认情况下,我使用 concat/andThen 运算符一个接一个地运行它们。 有时我希望部分可完成项并行运行,并在所有内容完成后继续执行列表中的下一个可完成项。 我试图用这段代码实现这一点:
var completable =
getAsyncCompletables()?.let {
it
} ?: run {
completables.removeAt(0).getCompletable()
}
while (completables.isNotEmpty()) {
val nextCompletable = getAsyncCompletables()?.let {
it
} ?: run {
completables.removeAt(0).getCompletable()
}
completable = nextCompletable.startWith(completable)
}
completable
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe()
我使用此代码来检测异步可完成项:
private fun getAsyncCompletables(): Completable? {
if (completables.size < 2 || !completables[1].async) {
return null
}
var completable = completables.removeAt(0).getCompletable()
while (completables.isNotEmpty() && completables[0].async) {
completable = completable.mergeWith(completables.removeAt(0).getCompletable())
}
return completable
}
一切正常,除了一件事,尽管我使用了“startWith”,但最后一个可完成的未触发。 我也尝试了“concatWith”和“andThen”,但结果相同。
最佳答案
如果不查看更多代码,特别是 async
的作用以及 completables
的数据结构是什么,则很难回答。但是,无论这些值如何,您正在寻找的答案很可能是相似的。您可能希望使用 Completable.merge(...)
或 Completable.mergeArray(...)
。
根据文档:
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* ...
*/
为了实现并行执行,您需要使用新线程调用列表/数组/集合中的每个 Completables subscribeOn
。这可以通过 Schedulers.newThread()
或共享池(如 Schedulers.io()
)完成。
为了确定,我进行了测试。这是代码。
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...
val completeOne = Completable.fromAction {
Timber.d("Completable #1 running on ${Thread.currentThread().name}")
}
val completeTwo = Completable.fromAction {
Timber.d("Completable #2 running on ${Thread.currentThread().name}")
}
val completeThree = Completable.fromAction {
Timber.d("Completable #3 running on ${Thread.currentThread().name}")
}
val completables = listOf(completeOne, completeTwo, completeThree).map { CompletableWrapper(it) }
val asyncCompletables = completables
.asSequence()
.filter { it.async }
.map { it.getCompletable().subscribeOn(Schedulers.io()) }
.toList()
Completable.merge(asyncCompletables)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Timber.i("Completed all completables")
}, Timber::e)
}
class CompletableWrapper(
private val completable: Completable,
val async: Boolean = true
) {
fun getCompletable() = completable
}
这是输出。
D/MainActivity$onCreate$completeThree: Completable #3 running on RxCachedThreadScheduler-3
D/MainActivity$onCreate$completeTwo: Completable #2 running on RxCachedThreadScheduler-2
D/MainActivity$onCreate$completeOne: Completable #1 running on RxCachedThreadScheduler-1
I/MainActivity$onCreate: Completed all completables
如您所见,它在池中的新线程上运行每个可完成项,并且仅在每个可完成项完成后才调用 completed all。
See here for the documentation on Completable.merge/mergeArray .
关于android - 链完成同步和异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53541520/