TL;DR
给定一个包含一百万条记录的文件,其中文件中的每一行都需要执行大量逻辑,那么读取文件并完成在每一行上应用逻辑的最快方法是什么。我使用了带有文件读取器的多线程步骤,其 read
方法是synchronized
读取文件并使用 AsynItemProcessor
以便在自己的线程中处理记录。
我的期望是 AsynItemProcessor
一旦有来自读取器的记录需要处理,就应该立即开始。每个记录应该在自己的线程中处理;但是,在我下面的示例中似乎并非如此
我的 Spring 批处理作业中有一个步骤使用 TaskExecutor
使用 20 个线程和 10000 次提交间隔来读取文件。我也在使用 AsycnItemProcessor
和AsyncItemWriter
因为数据处理有时可能比从文件中读取一行所需的时间更长。
<step id="aggregationStep">
<tasklet throttle-limit="20" task-executor="taskExecutor">
<chunk reader="fileReader"
processor="asyncProcessor" writer="asyncWriter"
commit-interval="10000" />
</tasklet>
</step>
地点:
-
fileReader
是一个扩展FlatFileItemReader
的类和read
方法是synchronized
只需调用super.read
在其中。 -
asyncProcessor
只是一个AsyncItemProcessor
从文件中传递每一行并按键对其进行分组的 bean,并将其存储到包含Map<String,BigDecimal>
的单例 bean 中目的。换句话说,处理器只是将文件数据按几列进行分组,并将这些数据存储在内存中。 -
asyncWriter
无非是AsyncItemWriter
包装 no 操作ItemWriter
在其中。换句话说,该作业不需要进行任何类型的写入,因为处理器本身正在进行聚合并将数据存储在内存中。 (Map
)。 - 请注意
AsynItemProcessor
有它的ThreadPoolTaskExecutor
与corePoolSize=10
和maxPoolSize=20
和Step
有自己的ThreadPoolTaskExecutor
与corePoolSize=20
和maxPoolSize=40
通过上述设置,我的期望是读取和处理将并行发生。像这样的东西:
- FileReader 从文件中读取一条记录并将其传递给处理器
-
AsyncItemProcessor
执行聚合。因为它是AsyncItemProcessor
,调用process
的线程理想情况下,方法应该可以自由地执行其他工作? - 最后,
AsynItemWriter
会得到Future
并提取数据,但不执行任何操作,因为委托(delegate)是无操作ItemWriter
.
但是当我添加一些日志时,我没有看到我期望的内容:
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records
... 更多这样的行被打印,直到整个文件被读取。只有在读取文件后,我才开始看到处理器开始工作:
2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records
2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records
2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records
2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records
2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records
2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records
底线:为什么处理器在文件完全读取之前才启动?不管怎样ThreadPoolTaskExecutor
用于 AsynItemProcessor
的参数或整个step
,读取总是先完成,然后才开始处理。
最佳答案
这就是面向 block 的处理的工作原理。该步骤将读取变量中的 X 个项目(其中 X 是提交间隔),然后执行处理/写入。您可以在 code of ChunkOrientedTasklet
中看到这一点.
在多线程步骤中,每个 block 将由一个线程处理。
关于java - Spring 批处理 : Multithreaded step with AsyncItemProcessor doesn't run in parallel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57840342/