java - 使用 CompleteableFuture 和 Spring Transaction 耗尽池

标签 java spring-boot transactions spring-transactions

我正在尝试使用 CompleteableFutures 将数据库快速加载到内存中。我在方法级别启动 Spring 事务:

@Transactional()
    private void loadErUp() {
        StopWatch sw = StopWatch.createStarted();
        List<CompletableFuture<Void>> calls = new ArrayList<>();
        final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);

        for (long i = 1; i < 12 + 1; i++) {
            Long holder = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                this.loadPartition(holder, zdt);
            }, this.forkJoinPool);
            calls.add(future);
        }
        CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
        log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
    }

然后通过

将每个线程附加到主事务

TransactionSynchronizationManager.setActualTransactionActive(true);

private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        log.debug("Refresh thread start:{}", partitionKey);
        TransactionSynchronizationManager.setActualTransactionActive(true);
        StopWatch sw = StopWatch.createStarted();

        try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
            long count = authorityStream.peek(a -> {
                this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
            }).count();
            log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
            return count;
        }
    }

所以我每 30 秒运行一次这个批处理作业,在第 9 次运行时我得到 4 个线程,然后它挂起(12*8 次运行 = 96),因为它正在等待池打开。我得到:

Unable to acquire JDBC Connection; Unable to fetch a connection in 30 seconds, none available[size:100; busy:100; idle:0; lastwait:30000].

很明显连接没有提交。我认为这可能是因为我有自己的 ForkJoinPool,但是,我关闭了所有这些线程并且它似乎没有帮助。我还在 loadPartition() 方法下放置了另一个方法,但这似乎也无济于事。还有另一个线程讨论如何让交易工作,但我的工作,他们只是不提交。

最佳答案

如果你想让每个 #loadPartition 在它自己的线程和它自己的事务中运行,你需要:

  1. #loadPartition标记为@Transactional
  2. 调用proxied #loadPartition 方法,以便@Transactional 起作用。您可以通过 Autowiring 或从另一个代理类调用方法来完成此操作

事务没有传播到异步线程,因为(重要!)that method is not getting proxied .

所以它看起来像:

@Component
public class MyLoaderClass {

    // Autowire in this with constructor injection or @Autowired
    private MyLoaderClass myLoaderClass;

    // Removed @Transactional annotation
    public void loadErUp() {
        myLoaderClass.loadPartition(holder, zdt);
        ...
    }

    // 1) Add the @Transactional annotation to #loadPartition
    // 2) Make public to use self-autowiring (or refactored class, per link above)
    @Transactional
    public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        ...
        // Can remove TransactionSyncManager call
        ...
    }

}

您还需要确保您的批处理作业在未确保最后一个作业完成的情况下不会运行。您可以通过 using the @Scheduled annotation 轻松解决此问题为您的表负载确保运行不会“重叠”。

关于java - 使用 CompleteableFuture 和 Spring Transaction 耗尽池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53710191/

相关文章:

java - 如何使用 void 方法而不打印?

java - Spring Boot Azure 部署无法更新代码

java - Spring Boot + ActiveMQ 以编程方式动态订阅主题

java - Spring 应用程序似乎没有持久化数据

java - 将警报推送到 Java 中的通知托盘应用程序

java - 如何将 SimpleTimeLimiter (Guava) 与 Vaadin 一起使用

grails - 如何配置grails以将单个事务管理器用于多个数据源?

java - 数组越界并将 list<Object> 转换为 string[]

sql-server-2005 - 如何使用跨多个服务器链接的过程的事务?

c# - 有没有办法将 TransactionScope 与现有连接一起使用?