java - Spring Boot-2.1.3 : Parallel Methods Invocation with @Async with CompletableFuture

标签 java multithreading spring-boot parallel-processing stream

下面是我的代码,其中试图并行化 4 个方法调用,因为每个方法彼此独立并执行一些内存密集型统计操作。

@EnableAsync
@Configuration
public class Config {
   
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        
        executor.initialize();
        return executor;
    }
}

class OrderStatsService{

    public CumulativeStats compute() {
        log.info("CumulativeResult compute started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        List<Order> orders = getOrders();// API Call to fetch large set of orders size could be around 100k
        
        CumulativeResult cumulativeResult = new CumulativeResult();

        CompletableFuture<Long> stats1 = getStats1(orders);
        CompletableFuture<List<String>> result2 = getStats2(orders);
        CompletableFuture<Double> result3 = getStats3(orders);
        CompletableFuture<Map<String,String>> result4 = getStats4(orders);

        cumulativeResult.setStats1(stats1);
        cumulativeResult.setStats2(stats2);
        cumulativeResult.setStats3(stats3);
        cumulativeResult.setStats4(stats4);
        return cumulativeResult;
    }
    
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Long> getStats1(var orders) {
    log.info("getStats1 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<String>> getStats2(var orders) {
    log.info("getStats2 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Double>  getStats3(var> orders) {
    log.info("getStats3 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Map<String,String>> getStats4(var orders) {
    log.info("getStats4 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

}

我得到了预期的结果,但注意到调用 compute() 的主线程正在执行其他 4 个方法 getStats1,getStats2getStats3getStats4 方法。

 CumulativeResult compute started at 1655783237437, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
 getStats1 started at 1655783238022, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
 getStats2 started at 1655783238024, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
 getStats3 started at 1655783463062, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
 getStats4 started at 1655783238085, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28

我认为当我们将 CompletableFuture 用于带有 @EnableAsync 配置的 @Async 方法时,这些方法将被分配一个新线程来执行,有人可以解释一下这是并行方法调用的预期行为吗?我的配置有什么问题吗?或者,如果这是我们在同一线程中执行 caller 方法和 async 时如何实现并行性的预期行为?

最佳答案

所需的更改可以在代码中完成。

第 1 步:执行多线程的第一步是正确设置线程池配置。

这里是如何设置线程大小的示例

@Configuration
@EnableAsync
public class ThreadPoolConfiguration {

@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setThreadNamePrefix("thread-");
    threadPoolTaskExecutor.setCorePoolSize(100);
    threadPoolTaskExecutor.setMaxPoolSize(120);
    threadPoolTaskExecutor.setQueueCapacity(100000);
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

第二步:使用@Async注解

首先,让我们回顾一下规则。

  • @Async 注释 - 它必须仅应用于公共(public)方法。
  • @Async 的自调用将不起作用,这意味着从同一类中调用异步方法将不起作用。

有关@Async 的更多信息 you can go through

解决方案:

  • 将所有带有@Async 的方法保留在不同的服务中,您可以 从 OrderStatsService
  • 调用它
  • 您还可以做的一件事是从 Controller 和所有 @Async 方法可以保留在服务类中。
  • 用像 @Async("taskExecutor") 这样的 Bean 名称标记你的注解>

任何一种方法都适合您。

关于java - Spring Boot-2.1.3 : Parallel Methods Invocation with @Async with CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72695309/

相关文章:

java - HttpURLConnection.getRequestProperties() 抛出 IllegalStateException : "Already connected"

java - 无法在 TreeNode 类中向左/向右移动

ios - 如果为 setKeepAliveTimeout 方法设置 15 分钟,VOIP 应用程序不会及时唤醒

java - Bean 验证不适用于 spring webflux

java - 使用准备好的语句更改 MySQL 数据库中的数据

java - JUnit 测试 SpringJUnit4ClassRunner

java - 理解java多线程中的公平锁

c++ - 为什么 “memory_order_relaxed”在我的系统中被视为 “memory_order_seq_cst” [C++]

java - Springboot 与 Spring OAuth2

Spring Boot JSP 未找到