project-reactor - 在计划任务中使用 Flux

标签 project-reactor spring-webflux

我正在处理一个 Spring Webflux 项目,并且在尝试在计划任务中发布和使用 Flux 时遇到了问题。

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}

我配置的调度程序:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))

除非我故意阻止,否则此任务永远不会完成:
.then().block()

我最初没有打扰发布/订阅调度程序的直接引用,并且我已经尝试了所有看似合理的选项但没有效果。

我的日志事件发生了,但似乎当来自调度程序的此任务的线程死亡时,通量也是垃圾;即使一旦我指定了 publishOn 和 subscribeOn 行为,它们就应该在自己的线程池中?

我想让这个 Action 完全被动,任何建议将不胜感激。

最佳答案

@Scheduled未与 Flux 集成,所以它不知道如何处理 Flux你应该退货吗?结合这一事实,在 Reactor(以及一般的 Reactive Streams)中,通常什么都不会发生,直到您 subscribe() ,然后您就可以开始查看出了什么问题。
block()实际上是 subscribe() 的一种形式,这就是将它添加到代码后它起作用的原因。这实际上可能是最好的选择,因为您需要将一段响应式(Reactive)代码(来自 ReactiveRepository )桥接到命令式阻塞世界(您的 @Scheduled fun )。

关于project-reactor - 在计划任务中使用 Flux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46762242/

相关文章:

Java react 器-链Mono <Void>与另一个产生Mono <Object>的异步任务

java - 在 Spring Boot Webflux 中生成服务器发送的事件

java - 在转换方法 WebFlux 中从 lambda 返回 null 或可为 null 的内容

java - 将 RxJava Single 转换为 Mono

java - Kubernetes Ingress 应该与 Spring Cloud Gateway 共存吗?

java - 将 List<Mono<String> 转换为 Flux<String>

java - 合并热通量源

spring-webflux - spring webflux webfilter 去除请求体

spring - 不明白如何让通量订阅在 kotlin 中工作

java - 单线程 Flux 中的 Mono