我有一个可能会长时间运行的 Flux,我想在经过一定时间后停止它。我已经找到了几种执行此操作的方法,但是我正在努力解决的是如何能够判断 Flux 超时而不是自然完成。
示例(非常简单)代码:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.collectList()
.block();
我想要的是这样的:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.collectList()
.block();
但是 take
似乎没有通知错误;我所看到的只是一个 terminate
和 complete
信号,如果我不将 take
包含在 Flux 中并让它运行,我会得到这个信号自然完成。
我简单地研究了 timeout
运算符,它确实抛出了异常,但是 timeout
看起来只有在 Flux 不发出异常时才会抛出异常特定时间内的单个元素,而不是整个 Flux 在特定时间内未完成。
有人有解决此问题的任何提示或示例吗?
提前致谢!
最佳答案
已更新
您可以使用 takeUntilOther
运算符在给定时间后发出错误信号:
Duration timeout = Duration.ofMillis(500);
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
// .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.blockLast();
如果使用 .collectList()
或 .then()
运算符将 Flux
转换为 Mono
,然后您只需应用 .timeout(...)
运算符,它就会按照您的要求运行:
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.collectList()
.timeout(Duration.ofMillis(500))
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.block();
关于java - 如果 Reactor Flux 未在设定时间内完成,则抛出异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67842409/