project-reactor - 如何定期执行任务并响应外部信号?

标签 project-reactor

想象一个重复执行的任务,每次执行之间最多有 10 秒的延迟。还有一个外部信号会导致任务立即执行。 (外部信号后,计时器是否重置回10秒并不重要。)

我尝试过两种建模方式,但都不理想:

  1. 使用 Flux.interval(Duration.ofSeconds(10)) 进行基于计时器的执行。使用由 Sinks.many() 创建的热通量,它接收外部信号。合并两个通量,并在合并结果的 concatMap 运算符中执行任务。

    这种方法的问题是信号可能会在执行之前“堆积”。添加 onBackPressureLatest() 可以缓解堆耗尽,但如果任务的一次执行时间超过延迟间隔,则无法阻止多个信号排队。

  2. 添加一个delay(),后跟一个repeat()。这里的挑战是,当热外部触发通量发出一个值时,我无法弄清楚如何让延迟可靠地提前结束。 Mono.firstWithValue 几乎是我想要的,但它每次都会订阅外部触发通量,这意味着可能会错过信号。

如何使用 Reactor 来实现这种任务调度?

最佳答案

您应该查看Flux.windowTimeout(int, Duration)使用 windowTimeout(1, Duration.ofSeconds(10)) 时,生成的外部 Flux 将发出内部 Flux,每当接收到一个元素或经过 10 秒(以先到者为准)时,内部 Flux 就会结束。然后您可以将任务附加到窗口的末尾。

示例:

Flux<String> externalSignalFlux;

public Mono<String> task() {
    return Mono.just("Task completed");
}

public Flux<String> windowTimeoutDemo() {
    return externalSignalFlux
            .windowTimeout(1, Duration.ofSeconds(10))
            .concatMap(window -> window.then(task()));
}

如果需要处理信号的值:

Flux<String> externalSignalFlux;

public Mono<String> task(String signal) {
    return Mono.just("Task completed: " + signal);
}

public Flux<String> windowTimeoutDemo() {
    return externalSignalFlux
            .windowTimeout(1, Duration.ofSeconds(10))
            .concatMap(window -> window.defaultIfEmpty("repeatedTask"))
            .concatMap(this::task);
}

注意:虽然 Flux.bufferTimeout(int, Duration)看起来非常相似,每个窗口的超时仅在收到第一个元素时开始,从而导致生成的 Flux 中出现间隙。

关于project-reactor - 如何定期执行任务并响应外部信号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77203793/

相关文章:

java - Reactor 有没有办法忽略错误信号?

java - 寻找优雅的Reactor热流生成方式

java - Webflux如何从流内部设置cookie

java - R2DBC 数据库客户端未调用 doOnSuccess 或因嵌套调用而终止

kotlin - 在 Kotlin 中使用 Reactor 分页 Gitlab API

java - 项目 Reactor 中的 Parallel Flux 与 Flux

java - Reactor Flux 发布方法

spring-webflux - 从 Mono.fromCallable 返回 Mono.empty()

带有 WebFlux 的 Spring Boot 总是在测试中抛出 403 状态

java - Flux Reactor - 每小时的简单时间表