android - Kotlin 协程 Flow 中 RxJava Observable 和 FlatMap 的等价物是什么

标签 android kotlin rx-java kotlin-coroutines kotlin-flow

嗨,我有一个 rxJava observable 和 Flatmap,我想将其转换为 kotlin 协程流。

rxJava 可观察

val startFuellingObservable: Observable<Void>

订阅/平面 map

subscriptions += view.startFuellingObservable
        .onBackpressureLatest()
        .doOnNext { view.showLoader(false) }
        .flatMap {
            if (!hasOpenInopIncidents()) {
                 //THIS API CALL RETURNS RX OBSERVABLE
                startFuellingUseCase.execute(equipmentProvider.get()) 
            } else {
                val incidentOpenResponse = GenericResponse(false)
                incidentOpenResponse.error = OPEN_INCIDENTS
                Observable.just(incidentOpenResponse)
            }
        }
        .subscribe(
            { handleStartFuellingClicked(view, it) },
            { onStartFuellingError(view) }
        )

我已将可观察值更改为 Flow

val startFuellingObservable: Flow<Void>

现在的流程

我能做到

view.startFuellingObservable
            .onEach { view.showLoader(false) }

*** 我已进行 API 调用以返回 Flow 而不是 observable

但我不知道如何使用 Flow 完成平面图的其余部分

您能否建议如何使用 Flow 执行相同的代码

谢谢

最佳答案

迟到的回答,但我希望它可以帮助其他人。 首先,有一个来自 kotlin Concurrent 的 Flow,所以你肯定需要导入

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0'

属于进口kotlinx.coroutines.flow

  1. Observables<T>来自 RxJava 的将是 Flow<T>
  2. Rxjava FlatMap 是 Kotlin Flow API 中的 FlatMapMerge

FlatMapMerge 示例:

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

result:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

Flow API 中有 3 种类型的 FlatMap

FlatMapConcat

该运算符是连续且成对的。一旦outerFlow发射一次,innerFlow也必须发射一次才能收集最终结果。一旦任一流发出第 N 次,另一个流必须发出第 N 次,然后才能收集第 N 个 flatMapResult。

FlatMapMerge

该运营商对排放的限制最少,但可能会导致排放过多。每次outerFlow 发出一个值时,每个innerFlow 发出的值都会从该值平坦映射到要收集的最终flatMapResult 中。最终的排放计数是innerFlow 和outerFlow 排放量的乘积。

FlatMap最新

该算子只关心最新的排放结果,不处理旧的排放。每次outerFlow发出一个值时,它都会与最新的innerFlow值进行平面映射。每次innerFlow发出一个值时,它都会与最新的outerFlow值进行平面映射。因此,最终的排放计数是介于零和innerFlow 排放乘以outerFlow 排放之间的值。

关于android - Kotlin 协程 Flow 中 RxJava Observable 和 FlatMap 的等价物是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65705638/

相关文章:

android - 无法在内部目录中保存文件

android - 需要 RXJava 异步帮助。输入被调用阻塞

android - 如何将线性布局 xml 文件合并到撰写文件中?

android - 为什么在 Android onCreate 下匿名登录时 Firebase 控制台不创建匿名用户?

android - RxJava 和 Android : how to invalidate observable that use cache()?

android - 长链中的 rxjava StackOverflowError 异常

Android Wear Preview 未连接到 Wear Emulator

android - 在金鱼内核中添加类似的头文件

android - 与 Android 平板电脑刷新 MTP 连接?

java - 使拇指的中心 TextView 并使用搜索栏进度移动 TextView