java - 在热流上重试操作?

标签 java project-reactor

Flux.just(1, 2, 3)
        .doOnNext(__ -> System.out.println("producing number: " + __))
        .publish()
        .autoConnect()
        .doOnNext((Integer __) -> {
            System.out.println("throwing error.");
            throw new RuntimeException("aaa");
        })
        .retry(1, error -> true)
        .subscribe(number -> System.out.println("I will never be here..."),
                error -> System.out.println("will I be here? " + error),
                () -> System.out.println("completed!"));

输出:

producing number: 1
throwing error.
producing number: 2
producing number: 3

预期输出:(根据我的逻辑 - 我确信我错了)

producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3

为什么输出与预期输出不同?

最佳答案

这有点棘手,但归结为:

调用 publishautoConnect 的效果是,一旦您订阅,就会创建一个内部订阅,即使外部订阅被取消,该内部订阅也会持续存在。将其视为通过 publishautoConnect 连接的两个流。

--inner stream--> publish/connect-> --outer stream--> subscriber

无论外部流发生什么,内部流都会继续运行。这是有意为之的,因为您有一个热门的可观察对象,并且您可能有多个订阅者。如果其中一个断开连接,则其他人仍然希望从流中获取值。换句话说,发布运算符告诉源它可以根据需要生成值。

    --inner stream--> publish/connect-> --outer stream--> subscriber1
                                        --outer stream--> subscriber2

您可以通过删除重试来验证该行为。数字 1 和 2 仍将被打印。如果您希望源停止生成值,可以使用 refCount 而不是 autoConnect。如果没有更多订阅者,refCount 将取消内部流。

棘手的部分来了:这个流是同步的并且 Streams are just functions 。它的底层有点复杂,但是重试运算符内的订阅函数会一直运行,直到内部流完成。只有这样它才会重新订阅。

这与异步流不同,例如使用Flux.interval创建。

Flux.interval(Duration.ofSeconds(1))
    .doOnNext(__ -> System.out.println("producing number: " + __))
    .publish()
    .autoConnect()
    .doOnNext((Integer __) -> {
        System.out.println("throwing error.");
        throw new RuntimeException("aaa");
    })
    .retry(1)
    .subscribe()

当您调用 .subscribe() 时,retry 将在内部调用 doOnNext 上的订阅,依此类推,直到第一个 doOnNext 在 Flux 上调用 subscribe。间隔 Flux 表示“好的,我将在一秒钟内发出第一个值”,并且订阅已建立。

一秒钟后,发出一个值,抛出一个错误,重试运算符再次订阅。

在您的示例中,Flux 已经在订阅完全建立之前在订阅调用期间开始发出值,可以这么说。

关于java - 在热流上重试操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51107243/

相关文章:

java - 在 flatMap 上使用reduce时,Reactor Flux订阅者流停止

java - 未找到异常 hibernate.properties

java - 从 linux 命令行生成覆盖率报告

java - Maven 测试中的编码无法正常工作

java - 这个 for 循环如何知道这个数组数据结构从一开始有多长?

Spring Boot 2. 异步 API。 CompletableFuture 与 Reactive

java - 读取文件时忽略某些单词

java - 如何在 Java 中的某个时间后停止执行 Flux?

java - Kotlin,响应式(Reactive)编程 : How to consume the value of one function output to another one

java - 当 ParallelFlux 发生错误时回滚所有更改