Kotlin Flow onBackpressureDrop RxJava2 模拟

标签 kotlin rx-java2 kotlin-coroutines backpressure kotlin-flow

在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:

  • 最新
  • 缓冲器
  • 掉落

  • 在整个 Rx 链中都受到尊重。

    在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。
    我能够通过使用以下内容使 Flow 具有 BUFFER 和 LATEST 策略:

    对于缓冲器:
    observeFlow()
        .buffer(10)
        .collect { ... }
    

    最新:
    observeFlow()
        .conflate()
        .collect { ... }
    

    这只是同一个缓冲区运算符的快捷方式。

    但是我找不到任何可以与 DROP 相同的东西。
    简而言之,当前一个值尚未处理时,DROP 将删除流中的任何值。
    对于 Flow,我什至不确定这是否可能。

    考虑案例:
    observeFlow()
        .backpressureDrop() // non-existent operator, just for illustrative purposes
        .map { ... }
        .flatMapMerge { ... }
        .collect { ... }
    

    因此,backpressureDrop 应该尊重流中下面完成的任何工作,而该运算符(operator)对下面发生的事情一无所知(没有来自底部的显式回调——就像 RxJava 订阅者中的“请求”方法)。因此似乎不太可能。在收集前一个项目之前,该运算符(operator)不应通过任何事件。

    是否有任何现成的运算符,我想念它,或者是否有一种简单的方法可以使用现有的 API 实现这样的东西?

    最佳答案

    is there a straightforward way to implement something like this



    取决于你的直截了当的衡量标准。这是我将如何做到的。

    背压转化为协同程序世界中的程序暂停和恢复。对于 onBackpressureDrop ,下游必须表明它已准备好接受一项并暂停它,而上游不应等待下游准备好。

    您必须以无限制的方式消耗上游并将项目和终端事件移交给下游等待这些信号。
    package hu.akarnokd.kotlin.flow.impl
    
    import hu.akarnokd.kotlin.flow.Resumable
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.AbstractFlow
    import kotlinx.coroutines.flow.Flow
    import kotlinx.coroutines.flow.FlowCollector
    import kotlinx.coroutines.flow.collect
    import java.util.concurrent.atomic.AtomicBoolean
    import java.util.concurrent.atomic.AtomicReference
    
    @FlowPreview
    internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
     : AbstractFlow<T>() {
        @ExperimentalCoroutinesApi
        @InternalCoroutinesApi
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            coroutineScope {
                val consumerReady = AtomicBoolean()
                val producerReady = Resumable()
                val value = AtomicReference<T>()
                val done = AtomicBoolean()
                val error = AtomicReference<Throwable>();
    
                launch {
                    try {
                        source.collect {
                            if (consumerReady.get()) {
                                value.set(it);
                                consumerReady.set(false);
                                producerReady.resume();
                            }
                        }
                        done.set(true)
                    } catch (ex: Throwable) {
                        error.set(ex)
                    }
                    producerReady.resume()
                }
    
                while (true) {
                    consumerReady.set(true)
                    producerReady.await()
    
                    val d = done.get()
                    val ex = error.get()
                    val v = value.getAndSet(null)
    
                    if (ex != null) {
                        throw ex;
                    }
                    if (d) {
                        break;
                    }
    
                    collector.emit(v)
                }
            }
        }
    }
    

    注:Resumable执行。

    那么让我们来看看实现。

    首先,需要5个变量在上游的收集器和为下游工作的收集器之间传递信息:
    - consumerReady表示下游已准备好进行下一项,
    - producerReady表示生产者已经存储了下一个项目(或终端信号)并且下游可以恢复
    - value准备消费的上游项目
    - done上游已经结束
    - error上游失败了

    接下来,我们必须为上游启动收集器,因为 collect 正在暂停并且在完成之前根本不会让下游消费者循环运行。在这个收集器中,我们检查下游消费者是否准备好(通过 consumerReady ),如果是,则存储当前项目,清除就绪标志并通过 producerReady 表示其可用性.清除 consumerReady将阻止后续上游项目的存储,直到下游本身表明新的准备就绪。

    当上游结束或崩溃时,我们设置 doneerror变量并表明生产者已经说过。

    launch { }部分,我们现在将代表下游收集器继续使用共享变量。

    每轮中的第一件事是表明我们已准备好接受下一个值,然后等待生产者端的信号,它将下一个事件放入共享变量中。

    接下来,我们从这些变量中收集值。我们渴望完成或抛出错误,并且仅作为最后的手段将上游项目重新发送到下游收集器。

    关于Kotlin Flow onBackpressureDrop RxJava2 模拟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59910643/

    相关文章:

    kotlin - 检查字符是否表示Kotlin中的非字符代码点

    reflection - 使用反射来传递和修改原语而不使用数组

    intellij-idea - IntelliJ Idea更改粘贴的代码

    rx-java2 - 订阅 2 个可观察对象,但如果第一个条件为真,则只订阅第二个

    java - 如何延迟 RxJava2 订阅?

    android - 如何合并到 kotlin 协程在一起?

    kotlin - 为什么一个普通函数需要用 viewModelScope.launch 包装?

    arrays - "No set method providing array access"-- 为什么在 Kotlin 中会发生这种情况?

    android - 收集服务中的流

    java - Android Retrofit2/RxJava2/Room——简单的数据处理