java - 我如何在 RxJava 中显式地发出 Flowable 完成的信号?

标签 java kotlin rx-java2

我正在尝试创建一个 Flowable,它包装了一个 Iterable。我定期将元素推送到我的 Iterable 但似乎 completion 事件是隐式的。我不知道如何表示处理已完成。例如在我的代码中:

    // note that this code is written in Kotlin
    val iterable = LinkedBlockingQueue<Int>()
    iterable.addAll(listOf(1, 2, 3))

    val flowable = Flowable.fromIterable(iterable)
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.computation())

    flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})

    iterable.add(4)

    Thread.sleep(1000)

    iterable.add(5)

    Thread.sleep(1000)

这打印:

1 2 3 4 completed

我检查了 Flowable 接口(interface)的来源,但似乎我无法明确表示 Flowable 已完成。我该怎么做?在我的程序中,我发布的事件之间有一些延迟,我想明确何时完成事件流。

澄清: 我有一个长时间运行的进程,它会发出事件。我将它们收集在一个队列中,并公开一个方法,该方法返回一个环绕我的队列的 Flowable。问题是当我创建 Flowable 时队列中可能已经有元素了。我将只处理一次事件,并且我知道事件流何时停止,所以我知道何时需要完成 Flowable。

最佳答案

使用 .fromIterable 是为您的用例创建 Flowable 的错误方法。
我实际上并不清楚那个用例是什么,但你可能想使用 Flowable.create()PublishSubject

val flowable = Flowable.create<Int>( {
    it.onNext(1)
    it.onNext(2)
    it.onComplete()
}, BackpressureStrategy.MISSING)

val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()

当然,您如何处理背压将取决于数据源的性质。

关于java - 我如何在 RxJava 中显式地发出 Flowable 完成的信号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42176634/

相关文章:

java - 在java中删除字符串的最后一部分

Java使用数组查找重复项(包含概念)

android - 使用 Retrofit 从服务器读取纯文本响应

rx-java2 - 取消订阅 RxJava2/RxAndroid PublishSubject

java - java游戏编程中求圆角矩形的中心

java - 哪种更好的方式通过 Tomcat 提供文件(用于下载)?

retrofit - 使用 okhttp 分块的传输编码仅提供完整结果

android - Realm 对象未更新

android - 使用 onPopulateAccessibilityEvent 和 AccessibilityDelegate 自定义 recyclerview 中的可访问性

kotlin - 为什么密封类的私有(private)构造函数可以在子类中调用?