在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:
在整个 Rx 链中都受到尊重。
在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。
我能够通过使用以下内容使 Flow 具有 BUFFER 和 LATEST 策略:
对于缓冲器:
observeFlow()
.buffer(10)
.collect { ... }
最新:
observeFlow()
.conflate()
.collect { ... }
这只是同一个缓冲区运算符的快捷方式。
但是我找不到任何可以与 DROP 相同的东西。
简而言之,当前一个值尚未处理时,DROP 将删除流中的任何值。
对于 Flow,我什至不确定这是否可能。
考虑案例:
observeFlow()
.backpressureDrop() // non-existent operator, just for illustrative purposes
.map { ... }
.flatMapMerge { ... }
.collect { ... }
因此,backpressureDrop 应该尊重流中下面完成的任何工作,而该运算符(operator)对下面发生的事情一无所知(没有来自底部的显式回调——就像 RxJava 订阅者中的“请求”方法)。因此似乎不太可能。在收集前一个项目之前,该运算符(operator)不应通过任何事件。
是否有任何现成的运算符,我想念它,或者是否有一种简单的方法可以使用现有的 API 实现这样的东西?
最佳答案
is there a straightforward way to implement something like this
取决于你的直截了当的衡量标准。这是我将如何做到的。
背压转化为协同程序世界中的程序暂停和恢复。对于
onBackpressureDrop
,下游必须表明它已准备好接受一项并暂停它,而上游不应等待下游准备好。您必须以无限制的方式消耗上游并将项目和终端事件移交给下游等待这些信号。
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}
注:Resumable执行。
那么让我们来看看实现。
首先,需要5个变量在上游的收集器和为下游工作的收集器之间传递信息:
-
consumerReady
表示下游已准备好进行下一项,-
producerReady
表示生产者已经存储了下一个项目(或终端信号)并且下游可以恢复-
value
准备消费的上游项目-
done
上游已经结束-
error
上游失败了接下来,我们必须为上游启动收集器,因为 collect 正在暂停并且在完成之前根本不会让下游消费者循环运行。在这个收集器中,我们检查下游消费者是否准备好(通过
consumerReady
),如果是,则存储当前项目,清除就绪标志并通过 producerReady
表示其可用性.清除 consumerReady
将阻止后续上游项目的存储,直到下游本身表明新的准备就绪。当上游结束或崩溃时,我们设置
done
或 error
变量并表明生产者已经说过。后
launch { }
部分,我们现在将代表下游收集器继续使用共享变量。每轮中的第一件事是表明我们已准备好接受下一个值,然后等待生产者端的信号,它将下一个事件放入共享变量中。
接下来,我们从这些变量中收集值。我们渴望完成或抛出错误,并且仅作为最后的手段将上游项目重新发送到下游收集器。
关于Kotlin Flow onBackpressureDrop RxJava2 模拟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59910643/