java - 如果 Reactor Flux 未在设定时间内完成,则抛出异常

标签 java project-reactor

我有一个可能会长时间运行的 Flux,我想在经过一定时间后停止它。我已经找到了几种执行此操作的方法,但是我正在努力解决的是如何能够判断 Flux 超时而不是自然完成。

示例(非常简单)代码:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .collectList()
        .block();

我想要的是这样的:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .collectList()
        .block();

但是 take 似乎没有通知错误;我所看到的只是一个 terminatecomplete 信号,如果我不将 take 包含在 Flux 中并让它运行,我会得到这个信号自然完成。

我简单地研究了 timeout 运算符,它确实抛出了异常,但是 timeout 看起来只有在 Flux 不发出异常时才会抛出异常特定时间内的单个元素,而不是整个 Flux 在特定时间内未完成。

有人有解决此问题的任何提示或示例吗?

提前致谢!

最佳答案

已更新

您可以使用 takeUntilOther 运算符在给定时间后发出错误信号:

Duration timeout = Duration.ofMillis(500);
Flux.range(0, 10000)
    .delayElements(Duration.ofMillis(10))
    .doOnNext(System.out::println)
    .takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
    // .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
    .doOnError(t -> {
        if (t instanceof TimeoutException) {
            System.out.println("I timed out");
        }
    })
    .blockLast();

如果使用 .collectList().then() 运算符将 Flux 转换为 Mono ,然后您只需应用 .timeout(...) 运算符,它就会按照您的要求运行:

Flux.range(0, 10000)
    .delayElements(Duration.ofMillis(10))
    .doOnNext(System.out::println)
    .collectList()
    .timeout(Duration.ofMillis(500))
    .doOnError(t -> {
        if (t instanceof TimeoutException) {
            System.out.println("I timed out");
        }
    })
    .block();

关于java - 如果 Reactor Flux 未在设定时间内完成,则抛出异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67842409/

相关文章:

java.lang.NoClassDefFoundError : javax/el/PropertyNotFoundException when I send invalid values to controller 错误

java - 使用 java 邮件 API 从网站发送自动 javamail

java - 从 JFrame 类调用串行端口类

java - 如何使用 reactor-netty TcpClient 链接多个发送和接收操作

java - WebFlux 条件平面图

spring-boot - Reactor 3 - 如何在 Mono 错误时返回 Flux?

kotlin - Project Reactor onErrorMap 抛出的测试异常

java - 这是读取文本文件的好方法吗?

java - 大型应用程序的 session 管理 (Java)

java - 如何在 Spring Webflux Controller 中结合 Flux 和 ResponseEntity