这个问题在很多论坛都被问过很多次了。但我没有看到适合我的答案。我正在尝试在我的 spring 批处理实现中实现多线程步骤。
有一个包含 10 万条记录的暂存表
想要在 10 个线程中处理它,每个线程的提交间隔为 300 - 因此在任何时间点都有 3000 条记录。
我定义了一个任务执行器并在我想要多线程的步骤中引用了它
我的想法是,首先我会获取线程池大小 (10),并用值(可以是 1-10)更新 thread_id 列到 100k 记录中的每一个。在这种情况下,有 10 个线程和 100k 条记录,因此 10k 条记录将分配一个 id - 我正在尝试实现一个 stagingsteplistener 来执行此操作。
为此暂存表编写了一个阅读器。任务执行者将创建 10 个读者,每个读者必须读取 300 条不同的记录并处理它们——现在我如何传递一个公共(public) ID 在步骤监听器和读取器之间,以便每个线程都有 它自己要处理的记录集。
到目前为止,我只有一个 JVM。因此,我正在考虑在多线程步骤本身中执行此操作,而不是考虑基于分区的方法。
请帮忙......
我引用了 pro spring batch book 并创建了一个暂存步骤监听器,它使用如下作业参数从作业配置 xml 中接受运行 id
<beans:bean id="stagingStepListener"
class="com.apress.springbatch.statement.listener.StagingStepListener" scope="step">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="tableName" value="transaction"/>
<beans:property name="whereClause"
value="where jobId is null and processed is null"/>
<beans:property name="jobId" value="#{jobParameters[run.id]}"/>
</beans:bean>
我找不到的是这个?这个“run.id”来自哪里。我在书中的任何地方都没有看到这一点。我在我的 spring 批处理中复制了相同的实现,当我运行它时,我看到异常说 run.id 不可识别。请帮助我了解如何执行此操作?
最佳答案
- 我找不到的是这个?这个“run.id”是从哪里来的
作业参数
这只是您传递给 jobParameters 的一个参数。通常每个实例使用不同的 run.id
(常规名称),因为框架无法知道对 JobParameters 的哪些更改使其成为“下一个”作业实例。
您可以将此“run.id”传递给 jobParameters:
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()
看看 JobParametersIncrementer
的 documentation了解更多详情。
- 我如何在步骤监听器和读取器之间传递公共(public) ID,以便每个线程都有自己的一组记录来处理
不要
这是一条非常危险的路线,因为 Step 中的许多参与者(例如读者和作者)都是有状态的,如果状态没有按线程隔离,那么这些组件在多线程 Step 中是不可用的。特别是 Spring Batch 中的大多数现成的读取器和写入器并不是为多线程使用而设计的。
分区
我建议使用 Partitioning .它比看起来简单得多,您仍然可以为其使用多个线程。查看使用 partitioning 的示例批处理作业,来自“Spring Batch samples”是为了:
使用 PartitionHandler SPI 显示多线程步骤执行。该示例使用 TaskExecutorPartitionHandler 来分散读取某些文件的工作跨多个线程,每个线程执行一个 Step。关键组件是负责划分工作的 PartitionStep 和 MultiResourcePartitioner。请注意,被分区的步骤中的读取器和写入器是步骤范围内的,因此它们的状态不会在执行线程之间共享。
关于java - Spring Batch 多线程 - 如何让每个线程读取唯一记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9087691/