java - 线程执行器不工作 Spring 批处理

标签 java multithreading spring-batch

我有一个持续 9 分钟的处理,我想通过使用线程执行器来减少执行时间。

我的阅读器读取数据库中的 1200 行并创建 UNPIVOT,这让我可以在 csv 文件中写入 56 036 行。

我尝试了几种方法,但时间上没有区别,我的印象是没有考虑到配置。

@Bean
@StepScope
public ItemReader<DmNebefPdhExportRetenuCSV> datamartEffRetenuItemReader(
        @Value("#{jobParameters['dateExport']}") Date dateExport) throws Exception {
    PagingQueryProvider query = createEffRetenuQuery();
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("dateExport", dateExport);

    JdbcPagingItemReader<DmNebefPdhExportRetenuCSV> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(sdmDataSource);
    reader.setParameterValues(parameters);
    reader.setQueryProvider(query);
    reader.setFetchSize(1000);
    reader.setPageSize(1000);
    reader.setRowMapper(new BeanPropertyRowMapper<>(DmNebefPdhExportRetenuCSV.class));

    return reader;
}

@Bean
@StepScope
FlatFileItemWriter<DmNebefPdhExportRetenuCSV> exportEffRetenuItemWriter(
        @Value("#{jobParameters['dateExport']}") Date dateExport) {
    // Construction du Header
    StringHeaderWriter headerWriter = new StringHeaderWriter(EXPORT_EFF_RETENU_CSV_HEADER);

    String newExportFileVersion = getExportRetenuVersion(dateExport);

    // Nom du fichier d'export
    String csvFileName = createEffRetenuExportFileName(dateExport, newExportFileVersion);

    // Chemin complet d'enregistrement du fichier
    String exportFilePath = String.join("/", exportArchiveCreRetenuPath, csvFileName);

    // Définition du délimiteur et des champs à mapper
    LineAggregator<DmNebefPdhExportRetenuCSV> lineAggregator = effRetenuLineAggregator();

    // Put dans le context pour récupérer dans le listener
    exportEffRetenuJobListener.getJobExecution().getExecutionContext().put("exportFilePath", exportFilePath);
    exportEffRetenuJobListener.getJobExecution().getExecutionContext().put("csvFileName", csvFileName);
    exportEffRetenuJobListener.getJobExecution().getExecutionContext().put("newExportFileVersion",
            newExportFileVersion);

    FlatFileItemWriter<DmNebefPdhExportRetenuCSV> csvFileWriter = new FlatFileItemWriter<>();
    csvFileWriter.setShouldDeleteIfEmpty(true);
    csvFileWriter.setHeaderCallback(headerWriter);
    csvFileWriter.setResource(new FileSystemResource(exportFilePath));
    csvFileWriter.setLineAggregator(lineAggregator);

    return csvFileWriter;

}

@Bean
public Step exportCSVStep() throws Exception {
    return stepBuilderFactory.get("exportCSVStep")
            .<DmNebefPdhExportRetenuCSV, DmNebefPdhExportRetenuCSV>chunk(100)
            .reader(datamartEffRetenuItemReader(WILL_BE_INJECTED))
            .listener(readListener)
            .writer(exportEffRetenuItemWriter(WILL_BE_INJECTED))
            .listener(writeListener)
            .taskExecutor(taskExecutor())
            .throttleLimit(50)
            .build();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor ();
    taskExecutor.setMaxPoolSize(50);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

@Bean
public Job exportEffRetenuJob() throws Exception {
    return jobBuilderFactory
            .get("exportEffRetenuJob")
            .incrementer(new RunIdIncrementer())
            .listener(exportEffRetenuJobListener)
            .flow(exportCSVStep()).end().build();
}

我还尝试过使用新的 SimpleAsyncTaskExecutor()。

将 setFetchSize() 和 setPageSize() 添加到我的阅读器中,处理时间从 9 分钟减少到 10 秒。 但ThreadExecutor似乎不起作用。

日志:

没有 TaskExecutor:12,751 秒

[2018-01-03 11:41:32,087] INFO  ExportEffRetenuJobListener - Start job exportEffRetenuJob - export month 04-2017 
[2018-01-03 11:41:44,838] INFO  ExportEffRetenuJobListener - End job : exportEffRetenuJob - export month : 04-2017 - statut : COMPLETED

使用 TaskExecutor:11,328 秒

[2018-01-03 11:42:55,439] INFO  ExportEffRetenuJobListener - Start job exportEffRetenuJob - export month 04-2017 
[2018-01-03 11:43:06,767] INFO  ExportEffRetenuJobListener - End job : exportEffRetenuJob - export month : 04-2017 - statut : COMPLETED

测试 240 000 行读/写

使用 TaskExecutor:123 秒

没有 TaskExecutor:127 秒

我认为我没有很好地配置任务执行器。

最佳答案

您必须为您的执行程序设置一个corePoolSize。默认值为 1,因此您仍然只能得到一个线程。尝试设置 taskExecutor.setCorePoolSize(50) 并重新运行您的作业。

关于java - 线程执行器不工作 Spring 批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48065089/

相关文章:

java.lang.IllegalStateException : The content of the adapter has changed but ListView did not receive a notification in android Studio

java - 四元数平滑旋转

c++ - `scoped_thread` 通过值 `std::thread`

java - 在 @BeforeStep 之前初始化测试中的 Mock

java - 如何从 JTable 1 到 JTable 2 获取值?

java - 有没有比 Libgdx 更好的 2D 游戏引擎?

python - 在 Python 中创建一个简单的聊天应用程序(套接字)

java - wait 方法是否立即放弃对 Lock 的控制

java - 如何在 Spring Batch 模块中一次读取多个文件?

java - 如何将@Configuration 和@EnableScheduling 与Spring Batch 一起使用