java - RxJava BehaviorSubject 没有发出最后一项?

标签 java kotlin rx-java rx-java2

我有一个简单的 RxJava,使用 ReplaySubject,我可以获得结果,其中打印了所有 3 个数字。

    val observable : Observable<Int> = Observable.just(1, 2, 3)
    val subject = ReplaySubject.create<Int>()
    observable.subscribe(subject)
    subject.subscribe{
        result ->
        System.out.println("Start $result in Subscription Result")
    }

当我更改为 Behavior 时,我希望打印第三个数字,即 3,因为我一直认为 Behavior 是重播最后发出的项目。

    val observable : Observable<Int> = Observable.just(1, 2, 3)
    val subject = BehaviorSubject.create<Int>()
    observable.subscribe(subject)
    subject.subscribe{
        result ->
        System.out.println("Start $result in Subscription Result")
    }

但是它不打印任何东西。为什么?

我在这里错过了什么重要的东西吗?如果是,请告诉我如何打印假设发出的最后一项(即 3)。

最佳答案

它不打印任何东西,因为订阅已经终止。如果订阅仍然有效,那么将打印 3,例如:

val o1: Observable<Int> = Observable.just(1, 2, 3)
val o2: Observable<Int> = Observable.just(4).delay(100,TimeUnit.MILLISECONDS)
val observable: Observable<Int> = Observable.concat(o1, o2)
val subject = BehaviorSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe{
    result ->
    System.out.println("Start $result in Subscription Result")
}
Thread.sleep(1000)

将打印 3 和 4(延迟后),其中 3 作为订阅前的最新事件发出,而 4 在订阅后发出。

此外,正如@akarnokd 在评论部分中所解释的,ReplaySubject.createWithSize(1) 可用于始终重播最后一个项目,即使在可观察到的完成之后,如果需要单个项目而不考虑流完成状态,则observable.takeLast(1).subscribe(subject) 可用于保证:

val observable : Observable<Int> = Observable.just(1, 2, 3)
val subject = ReplaySubject.createWithSize<Int>(1)
observable.takeLast(1).subscribe(subject) //can be moved after subject.subscribe as well
subject.subscribe{
    result ->
    System.out.println("Start $result in Subscription Result")
}

关于java - RxJava BehaviorSubject 没有发出最后一项?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45253857/

相关文章:

android - 媒体播放器和 RecyclerView kotlin

android - 如何在 Android Jetpack Compose 中禁用可组合预览?

android - 如何使imageView脱离屏幕[Kotlin]

java - 为什么 RxJava 在异步处理时只使用约 10 个线程?

java - RxJava 错误处理

java - Lombok 进口问题

java - 发送 JPA 包装器对象

java - 如何计数并阻止在另一个线程上执行的 Observable?

java - volatile 关键字有什么用?

Java——传递泛型参数