java - 订阅可连接的热源时发生 CancellationException

标签 java project-reactor reactive-streams

我正在使用 react 堆核心 3.1.4 .

考虑以下代码片段:

Flux<String> flux = Flux.<String>create(sink -> sink.next("test"))
    .replay(1)
    .refCount();

flux.subscribe(System.out::println);
flux.next().subscribe(System.out::println); // The exception is thrown here!

预期输出:

test
test

实际输出:

test
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
    at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1202)
    at reactor.core.publisher.OperatorDisposables.dispose(OperatorDisposables.java:132)
    at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:132)
    at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:200)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:75)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:177)
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:808)
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:892)
    at reactor.core.publisher.FluxReplay.subscribe(FluxReplay.java:1085)
    at reactor.core.publisher.FluxRefCount$RefCountMonitor.subscribe(FluxRefCount.java:116)
    at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:77)
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3071)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3038)
    at reactor.core.publisher.Mono.subscribe(Mono.java:2985)
    at test.Test.main(Test.java:10)

对我来说,这是reactor-core库中的一个错误。我的陈述正确还是我遗漏(误解)了什么?

谢谢, 斯特凡

最佳答案

经确认,这是一个bug 。它固定在 react 堆堆芯3.1.5 .

关于java - 订阅可连接的热源时发生 CancellationException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48888858/

相关文章:

java - Persistence.createEntityManagerFactory() 在哪里寻找持久性单元?

java - 在 Java 中旋转 tiff 图像的最干净、最有效的技术?

java - 我如何等待多个单声道一次完成并获得值(value)

java - Spring数据响应式(Reactive)mongodb : How to retrieve generated IDs after failed insertAll()

java - 打印右对齐三角形

java - 根据条件在ant中复制文件

spring-webflux - Mono/Flux.fromCallable 和 Mono.defer 的区别

java - 在 Mono 的阻塞线程上执行转换步骤

java - 在 react 流中哪里放置参数验证?

java - 多个枚举与一个枚举