java - 在 Spring Webflux 中执行阻塞 JDBC 调用

标签 java spring-boot spring-webflux project-reactor

我正在使用 Spring Webflux 和 Spring 数据 jpa,使用 PostgreSql 作为后端数据库。 我不想在进行 findsave 等数据库调用时阻塞主线程。 为了达到同样的目的,我在 Controller 类和 jdbcScheduler 服务类中有一个主调度器。

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

现在,在我的服务层执行获取/保存调用时,我会:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

在 Controller 中,我做:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

这是正确的吗?和/或有更好的方法吗?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

是该任务将在 jdbcScheduler 线程上运行,稍后结果将在我的主要并行 scheduler 上发布。这种理解是否正确?

最佳答案

您对 publishOnsubscribeOn ( see reference documentation in the reactor project about those operators ) 的理解是正确的。

如果您调用阻塞库而不在特定调度程序上调度工作,这些调用将阻塞为数不多的可用线程之一(默认情况下,Netty 事件循环),您的应用程序将只能同时处理少数几个请求。

现在我不确定您这样做的目的是什么。

首先,parallel scheduler is designed for CPU bound tasks ,这意味着它们很少,与 CPU 内核一样多(或更多)。在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的核心数。您的应用将无法处理大量并发请求。

即使您选择更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环好,后者是在 Spring WebFlux 中本地安排请求处理的地方。

如果您的最终目标是性能和可伸缩性,那么在响应式(Reactive)应用程序中包装阻塞调用可能比常规 Servlet 容器的性能更差。

您可以改为使用 Spring MVC 并且:

  • 在处理阻塞库(如 JPA)时使用通常的阻塞返回类型
  • 当您不依赖于此类库时,使用 MonoFlux 返回类型

这不是非阻塞的,但仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。

关于java - 在 Spring Webflux 中执行阻塞 JDBC 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54864088/

相关文章:

java - ImageIO.read(inputstream) 在 BufferedImage 中给出 null

java - 如何在 ListView 中创建工作复选框(其中包含来自 sqlite 数据库的项目)

java - 值在 java spring Controller 中使用 post 方法接收 null

java - 多种类型的Java列表是否可能

spring - 如何从Spring Webflux Controller 捕获异常?

java - 我已声明为同步的保护方法,但它给出的输出似乎是该方法未同步

java - JPA findby 过滤子字符串列表

mysql - Spring - 如何在所有 URL 中显示子菜单

spring-webflux - 如何在 Flux 中迭代一个对象并对其进行操作?

java - Spring Web 客户端 : How to stream large byte[] to file?