想象一个重复执行的任务,每次执行之间最多有 10 秒的延迟。还有一个外部信号会导致任务立即执行。 (外部信号后,计时器是否重置回10秒并不重要。)
我尝试过两种建模方式,但都不理想:
使用
Flux.interval(Duration.ofSeconds(10))
进行基于计时器的执行。使用由Sinks.many()
创建的热通量,它接收外部信号。合并两个通量,并在合并结果的concatMap
运算符中执行任务。这种方法的问题是信号可能会在执行之前“堆积”。添加
onBackPressureLatest()
可以缓解堆耗尽,但如果任务的一次执行时间超过延迟间隔,则无法阻止多个信号排队。添加一个
delay()
,后跟一个repeat()
。这里的挑战是,当热外部触发通量发出一个值时,我无法弄清楚如何让延迟可靠地提前结束。Mono.firstWithValue
几乎是我想要的,但它每次都会订阅外部触发通量,这意味着可能会错过信号。
如何使用 Reactor 来实现这种任务调度?
最佳答案
您应该查看Flux.windowTimeout(int, Duration)
使用 windowTimeout(1, Duration.ofSeconds(10)) 时,生成的外部 Flux 将发出内部 Flux,每当接收到一个元素或经过 10 秒(以先到者为准)时,内部 Flux 就会结束。然后您可以将任务附加到窗口的末尾。
示例:
Flux<String> externalSignalFlux;
public Mono<String> task() {
return Mono.just("Task completed");
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.then(task()));
}
如果需要处理信号的值:
Flux<String> externalSignalFlux;
public Mono<String> task(String signal) {
return Mono.just("Task completed: " + signal);
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.defaultIfEmpty("repeatedTask"))
.concatMap(this::task);
}
注意:虽然 Flux.bufferTimeout(int, Duration)
看起来非常相似,每个窗口的超时仅在收到第一个元素时开始,从而导致生成的 Flux 中出现间隙。
关于project-reactor - 如何定期执行任务并响应外部信号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77203793/