我有一个 Observable,它发出许多对象,我想使用 window
或 buffer
操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定 count
参数来确定窗口中应有多少对象。
例如,假设可观察对象正在发出 Message
类的实例,如下所示。
class Message(
val int size: Int
)
我想根据消息实例的大小变量而不只是计数来缓冲或窗口化消息实例。例如,获取总大小最多为5000的消息窗口。
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
有没有简单的方法可以做到这一点?
最佳答案
首先我必须承认,我不是 RxJava 专家。 我刚刚发现你的问题具有挑战性,并试图找到解决方案。
有一个带有参数boundaryIndicator
的window()
函数。您必须创建一个在达到窗口大小时发出项目的 Publisher
/Flowable
。
在示例中,我创建了一个用作boundaryIndicator
的对象windowManager
。在 onNext
回调中,我调用 windowManager
并给它一个打开新窗口的机会。
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}
关于kotlin - 具有自定义计数标准的 RxJava 缓冲区/窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52791691/