kotlin - Observable withLatestFrom 值

标签 kotlin rx-java system.reactive publishsubject

我实现了一个名为“FilterByLatestFrom”的伪运算符作为 kotlin 的扩展函数。

我使用此运算符编写了下一个代码:

    fun testFilterByLatestFromOperator(){
    val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10)
    val observableC : PublishSubject<Int> = PublishSubject.create()
    val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC)

    observableB.subscribe { println("observableB onNext: $it") }

    observableA
            .subscribe({ println("Original : $it")})

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result A : $it") })

    observableC.onNext(3)

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result AC : $it") })
}

输出是:

observableB onNext: 2
Original : 1
Original : 2
Original : 3
Original : 4
Original : 5
Original : 6
Original : 7
Original : 8
Original : 9
Original : 10
Result A : 2
Result A : 4
Result A : 6
Result A : 8
Result A : 10
observableB onNext: 3
Result AC : 2
Result AC : 4
Result AC : 6
Result AC : 8
Result AC : 10

我希望过滤器运算符将根据可观察 B 的最后一个值过滤 obsA。 它适用于第一个 block ,但是当我添加 On-next 和新值时,它不会改变结果(使用原始可观察值中相同的最后一个值)。

这是 FilterByLatestFrom impl(它被设计为也可以在 Java 中使用(通过 compose):

class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){
fun filter() : ObservableTransformer<U,U> = ObservableTransformer {
    it
            .withLatestFrom(
                    observable,
                    BiFunction<U,T,Pair<U,Boolean>> {
                        u, t -> Pair(u,biFunction.apply(u,t))
                    })
            .filter { it.second }
            .map { it.first }
    }
}
fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> =
        this.compose(FilterByLatestFrom(observable,biFunction).filter())

我错过了什么?

编辑:我想我发现了问题:PublishSubject 应该是BehaviorSubject。并且合并函数应该是 conacat 以保证 obsC 将在 obsB 之后发出。

最佳答案

您的伪运算符 filterByLatestFrom 很好,问题出在测试中,PublishSubject 将仅发出后续项目,因此当您上次订阅时 ('result AC'),observableB 将仅发出 2,因为 observableC 已经发出 3 并且不会将其重播到 observableB(使用 merge )。

只需将 observableC.onNext(3) 移至最后一个订阅(最后一行)之后,您就应该看到预期的行为。

编辑: 也更改为 PublishSubject 就像您解决相同问题一样(主题将重播新订阅的最后一个值)

关于kotlin - Observable withLatestFrom 值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47347861/

相关文章:

android - TLS Client Hello 请求在 iOS 或 Android 中包含未知版本

java - 如何在 Kotlin 中创建匿名接口(interface)的实例?

java - Kotlin 反射 + 泛型

java - 具有异步数据库调用的 Dropwizard @UnitOfWork

java - "More produced than requested"使用 `replay` 和 `autoConnect` 时出现异常

kotlin - RxJava 如何将 Rxjava 中的 Single<T> 转换为 Retrofit 中的 Call<T> 或相反转换

c# - RX - 重新抛出包含方法的错误

android - 为什么 Android 上下文菜单是空的?

c# - 如果没有 GetAwaiter 方法,我可以 await 吗?

c# - 使用 System.Reactive.Linq 订阅第一个事件发生并取消