嗨,我有一个 rxJava observable 和 Flatmap,我想将其转换为 kotlin 协程流。
rxJava 可观察
val startFuellingObservable: Observable<Void>
订阅/平面 map
subscriptions += view.startFuellingObservable
.onBackpressureLatest()
.doOnNext { view.showLoader(false) }
.flatMap {
if (!hasOpenInopIncidents()) {
//THIS API CALL RETURNS RX OBSERVABLE
startFuellingUseCase.execute(equipmentProvider.get())
} else {
val incidentOpenResponse = GenericResponse(false)
incidentOpenResponse.error = OPEN_INCIDENTS
Observable.just(incidentOpenResponse)
}
}
.subscribe(
{ handleStartFuellingClicked(view, it) },
{ onStartFuellingError(view) }
)
我已将可观察值更改为 Flow
val startFuellingObservable: Flow<Void>
现在的流程
我能做到
view.startFuellingObservable
.onEach { view.showLoader(false) }
*** 我已进行 API 调用以返回 Flow 而不是 observable
但我不知道如何使用 Flow 完成平面图的其余部分
您能否建议如何使用 Flow 执行相同的代码
谢谢
最佳答案
迟到的回答,但我希望它可以帮助其他人。 首先,有一个来自 kotlin Concurrent 的 Flow,所以你肯定需要导入
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0'
属于进口kotlinx.coroutines.flow
-
Observables<T>
来自 RxJava 的将是Flow<T>
- Rxjava FlatMap 是 Kotlin Flow API 中的 FlatMapMerge
FlatMapMerge 示例:
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
result:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
Flow API 中有 3 种类型的 FlatMap
FlatMapConcat
该运算符是连续且成对的。一旦outerFlow发射一次,innerFlow也必须发射一次才能收集最终结果。一旦任一流发出第 N 次,另一个流必须发出第 N 次,然后才能收集第 N 个 flatMapResult。
FlatMapMerge
该运营商对排放的限制最少,但可能会导致排放过多。每次outerFlow 发出一个值时,每个innerFlow 发出的值都会从该值平坦映射到要收集的最终flatMapResult 中。最终的排放计数是innerFlow 和outerFlow 排放量的乘积。
FlatMap最新
该算子只关心最新的排放结果,不处理旧的排放。每次outerFlow发出一个值时,它都会与最新的innerFlow值进行平面映射。每次innerFlow发出一个值时,它都会与最新的outerFlow值进行平面映射。因此,最终的排放计数是介于零和innerFlow 排放乘以outerFlow 排放之间的值。
关于android - Kotlin 协程 Flow 中 RxJava Observable 和 FlatMap 的等价物是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65705638/