java - Spring 批处理 : Multithreaded step with AsyncItemProcessor doesn't run in parallel

标签 java spring spring-batch threadpoolexecutor itemprocessor

TL;DR

给定一个包含一百万条记录的文件,其中文件中的每一行都需要执行大量逻辑,那么读取文件并完成在每一行上应用逻辑的最快方法是什么。我使用了带有文件读取器的多线程步骤,其 read方法是synchronized读取文件并使用 AsynItemProcessor以便在自己的线程中处理记录。

我的期望是 AsynItemProcessor一旦有来自读取器的记录需要处理,就应该立即开始。每个记录应该在自己的线程中处理;但是,在我下面的示例中似乎并非如此

<小时/>

我的 Spring 批处理作业中有一个步骤使用 TaskExecutor使用 20 个线程和 10000 次提交间隔来读取文件。我也在使用 AsycnItemProcessorAsyncItemWriter因为数据处理有时可能比从文件中读取一行所需的时间更长。

<step id="aggregationStep">
    <tasklet throttle-limit="20" task-executor="taskExecutor">
        <chunk reader="fileReader"
            processor="asyncProcessor" writer="asyncWriter"
            commit-interval="10000" />
    </tasklet>
</step>

地点:

  1. fileReader是一个扩展 FlatFileItemReader 的类和 read方法是 synchronized 只需调用 super.read在其中。
  2. asyncProcessor只是一个AsyncItemProcessor从文件中传递每一行并按键对其进行分组的 bean,并将其存储到包含 Map<String,BigDecimal> 的单例 bean 中目的。换句话说,处理器只是将文件数据按几列进行分组,并将这些数据存储在内存中。
  3. asyncWriter无非是AsyncItemWriter包装 no 操作 ItemWriter在其中。换句话说,该作业不需要进行任何类型的写入,因为处理器本身正在进行聚合并将数据存储在内存中。 (Map)。
  4. 请注意 AsynItemProcessor有它的ThreadPoolTaskExecutorcorePoolSize=10maxPoolSize=20Step有自己的ThreadPoolTaskExecutorcorePoolSize=20maxPoolSize=40

通过上述设置,我的期望是读取和处理将并行发生。像这样的东西:

  1. FileReader 从文件中读取一条记录并将其传递给处理器
  2. AsyncItemProcessor执行聚合。因为它是AsyncItemProcessor ,调用 process 的线程理想情况下,方法应该可以自由地执行其他工作?
  3. 最后,AsynItemWriter会得到Future并提取数据,但不执行任何操作,因为委托(delegate)是无操作 ItemWriter .

但是当我添加一些日志时,我没有看到我期望的内容:

2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records 2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records

... 更多这样的行被打印,直到整个文件被读取。只有在读取文件后,我才开始看到处理器开始工作:

2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records 2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records 2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records 2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records 2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records 2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records

底线:为什么处理器在文件完全读取之前才启动?不管怎样ThreadPoolTaskExecutor用于 AsynItemProcessor 的参数或整个step ,读取总是先完成,然后才开始处理。

最佳答案

这就是面向 block 的处理的工作原理。该步骤将读取变量中的 X 个项目(其中 X 是提交间隔),然后执行处理/写入。您可以在 code of ChunkOrientedTasklet 中看到这一点.

在多线程步骤中,每个 block 将由一个线程处理。

关于java - Spring 批处理 : Multithreaded step with AsyncItemProcessor doesn't run in parallel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57840342/

相关文章:

java - 将日本日期格式(2013年11月24日)转换为普通日期格式(2013-11-24)

java - J2ME背景音乐

java - Websockets、SockJs、Stomp、Spring、RabbitMQ、自动删除用户特定队列

java - spring security Remember Me logut 不清除 cookie

java - Spring 批处理 : different job launcher for different jobs

java - android中的android os networkonmainthreadexception错误

java - 将我现有的服务器连接到 GCM

java - Spring MVC Controller 无法加载我的 HTML 页面

java - 手动更改 Spring Batch 元数据表是个坏主意吗?

spring - 如何在 Websphere ND 8.5 中运行 Spring Batch 管理