kotlin - 为什么 Sinks.many().multicast().onBackpressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

标签 kotlin spring-webflux project-reactor reactive-streams

我在使用 Sinks.Many<String> 时遇到了我不明白的行为将一些事件通知给多个订阅者:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
此代码显示第一个订阅者获得值 1 和 2,第二个订阅者获得 2。到目前为止一切顺利:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
现在,假设第一个订阅者在第一次发射后处理(取消)其订阅,我期待第一个订阅者获得 1,第二个获得 2:

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    d.dispose()

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)

}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
然而,当第二个订阅者尝试订阅时,通量被认为已完成。为什么会这样?我需要 Sinks.Many 随时可用,以便在不取消的情况下订阅和取消订阅。

最佳答案

我刚刚遇到了同样的问题。
这是由 autoCancel 默认为 true 引起的。不幸的是onBackpressureBuffer javadoc没有提及它。
此行为继承自 EmitterProcessor.create它被记录的地方。
要将 autoCancel 标志设置为 false,必须使用替代方法 onBackpressureBuffer

Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

关于kotlin - 为什么 Sinks.many().multicast().onBackpressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66671636/

相关文章:

android - 如何制作一串不易碎的空格?

kotlin - 为什么编译器认为这个 if 语句是一个表达式?

spring-boot - 从 Spring WebFlux 返回 Flux<String> 返回一个字符串而不是 JSON 中的字符串数组

spring-webflux - 如何使用 Reactor (Spring WebClient) 来模拟 for 循环

logging - 如何配置 kotlin-logging 记录器

Kotlin迭代器列出?

java - 使用 ReactiveCouchbaseRepository 时出现 IllegalArgumentException 期望找到类 rxSingle 的响应式(Reactive)适配器,但无法找到

spring - 非响应式(Reactive)远程服务器的响应式(Reactive) WebClient

json - 如何使用 Spring WebClient 按名称获取 json 字段?

project-reactor - 如何过滤掉空通量