kotlin - 具有自定义计数标准的 RxJava 缓冲区/窗口

标签 kotlin rx-java rx-java2

我有一个 Observable,它发出许多对象,我想使用 windowbuffer 操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定 count 参数来确定窗口中应有多少对象。

例如,假设可观察对象正在发出 Message 类的实例,如下所示。

class Message(
   val int size: Int
)

我想根据消息实例的大小变量而不只是计数来缓冲或窗口化消息实例。例如,获取总大小最多为5000的消息窗口。

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

有没有简单的方法可以做到这一点?

最佳答案

首先我必须承认,我不是 RxJava 专家。 我刚刚发现你的问题具有挑战性,并试图找到解决方案。

有一个带有参数boundaryIndi​​catorwindow()函数。您必须创建一个在达到窗口大小时发出项目的 Publisher/Flowable

在示例中,我创建了一个用作boundaryIndi​​cator 的对象windowManager。在 onNext 回调中,我调用 windowManager 并给它一个打开新窗口的机会。

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}

关于kotlin - 具有自定义计数标准的 RxJava 缓冲区/窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52791691/

相关文章:

java - 收到 RxJava Debounce 方法调用

java - 如何在 RxJava Vert.x 中结束链式 http 请求?

mvvm - 如何在ViewModel层上将普通数据(从房间返回)转换为LiveData

rx-java - 如何正确停止 rxjava Flowable?

java - RxJava2 : Repeat conditonally/don't repeat in `repeatWhen`

android - 从 kotlin 代码中删除 getter 和 setter

kotlin - GlobalScope 与 LifecycleOwner :CoroutineScope

java - Java 的 String[] 的 Kotlin 等价物是什么?

android - 如果适配器的数据通过 RxAndroid 设置,则 RecyclerView 在返回导航后滚动到顶部

android - 如何在 kotlin 中获取 HTML 输入?