我想为 State
的不同实现处理不同的可观察逻辑链。这可以通过密封类/代数数据类型/联合 + .flatMap()
轻松实现,但这会破坏流,其中像 .distinctUntilChanged()
这样的运算符只能工作在 .flatMap()
函数中,而不是在整个流本身中。
sealed class State {
object Loading : State()
data class Loaded(val value: Int) : State()
}
@Test fun distinctTest() {
val relay = PublishRelay.create<State>()
relay.flatMap {
fun handle(state: State): Observable<*> = when (state) {
State.Loading -> Observable.just(state)
.distinctUntilChanged()
.doOnNext { println("loading") }
is State.Loaded -> Observable.just(state)
.distinctUntilChanged()
.doOnNext { println(it.value) }
}
handle(it)
}
.subscribe()
relay.accept(State.Loading)
relay.accept(State.Loaded(1))
relay.accept(State.Loaded(2))
relay.accept(State.Loaded(3))
relay.accept(State.Loaded(3))
//desired: loading, 1, 2, 3
//actual: loading, 1, 2, 3, 3
}
注意,这是一个简化的例子。虽然我只是在这里打印,但实际上我想根据 State
这可以通过主题/中继来完成,但这会创建一个断开的、可变的流,我也想避免这种情况。
最佳答案
您能否将 Observable
拆分为多个可观察对象,每个可观察对象获取单一类型的事件?然后,您可以对这些 observable 执行一些操作,然后再将它们重新合并在一起。
我现在无法对此进行测试,因此可能需要进行一些调整。无论如何,我希望你能明白这里的想法:
@Test fun distinctTest() {
val relay = PublishRelay.create<State>()
val loadingObs = relay.filter { it is State.Loading }
.distinctUntilChanged()
.doOnNext { println("loading") }
val loadedObs = relay.filter { it is State.Loaded }
.distinctUntilChanged()
.doOnNext { println(it.value) }
val merged = loadingObs.mergeWith(loadedObs)
merged.subscribe()
relay.accept(State.Loading)
relay.accept(State.Loaded(1))
relay.accept(State.Loaded(2))
relay.accept(State.Loaded(3))
relay.accept(State.Loaded(3))
// Hopefully prints this: loading, 1, 2, 3
}
关于rx-java - 如何在没有 .flatMap 的情况下控制流,这会破坏 react 流,从而阻止 distinctUntilChanged 等运算符在整个流上工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43504473/