rx-java - 如何在没有 .flatMap 的情况下控制流,这会破坏 react 流,从而阻止 distinctUntilChanged 等运算符在整个流上工作

标签 rx-java kotlin reactive-programming rx-java2 algebraic-data-types

我想为 State 的不同实现处理不同的可观察逻辑链。这可以通过密封类/代数数据类型/联合 + .flatMap() 轻松实现,但这会破坏流,其中像 .distinctUntilChanged() 这样的运算符只能工作在 .flatMap() 函数中,而不是在整个流本身中。

sealed class State {
    object Loading : State()
    data class Loaded(val value: Int) : State()
}

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()
    relay.flatMap {
        fun handle(state: State): Observable<*> = when (state) {
            State.Loading -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println("loading") }

            is State.Loaded -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println(it.value) }
        }
        handle(it)
    }
            .subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    //desired: loading, 1, 2, 3
    //actual: loading, 1, 2, 3, 3
}

注意,这是一个简化的例子。虽然我只是在这里打印,但实际上我想根据 State

的实现类型执行不同的操作(以不同的方式渲染 UI)

这可以通过主题/中继来完成,但这会创建一个断开的、可变的流,我也想避免这种情况。

最佳答案

您能否将 Observable 拆分为多个可观察对象,每个可观察对象获取单一类型的事件?然后,您可以对这些 observable 执行一些操作,然后再将它们重新合并在一起。

我现在无法对此进行测试,因此可能需要进行一些调整。无论如何,我希望你能明白这里的想法:

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()

    val loadingObs = relay.filter { it is State.Loading }
                          .distinctUntilChanged()
                          .doOnNext { println("loading") }

    val loadedObs = relay.filter { it is State.Loaded }
                         .distinctUntilChanged()
                         .doOnNext { println(it.value) }

    val merged = loadingObs.mergeWith(loadedObs)

    merged.subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    // Hopefully prints this: loading, 1, 2, 3
}

关于rx-java - 如何在没有 .flatMap 的情况下控制流,这会破坏 react 流,从而阻止 distinctUntilChanged 等运算符在整个流上工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43504473/

相关文章:

java - 调用 void 方法并传递参数 RxJava

java - 在 Android 上使用 SQLite 触发器返回错误

kotlin - 如何修改调用扩展函数的对象本身?

javascript - 如何以缓存方式迭代 Cold Observable

javascript - 与玻璃钢战斗

java - RxJava - 调用 Observables 方法时的非确定性行为

android - 从 fragment 中的 MainActivity 中从 JSON API 获取数据

java - 如何对 Android RxJava 多线程进行单元测试

android - 单个 android 应用程序中具有多用户数据库的房间

c# - 使用一个可观察量来同步另一个可观察量