java - Spring Batch 多线程 - 如何让每个线程读取唯一记录?

标签 java multithreading spring spring-batch

这个问题在很多论坛都被问过很多次了。但我没有看到适合我的答案。我正在尝试在我的 spring 批处理实现中实现多线程步骤。

  1. 有一个包含 10 万条记录的暂存表

  2. 想要在 10 个线程中处理它,每个线程的提交间隔为 300 - 因此在任何时间点都有 3000 条记录。

  3. 我定义了一个任务执行器并在我想要多线程的步骤中引用了它

  4. 我的想法是,首先我会获取线程池大小 (10),并用值(可以是 1-10)更新 thread_id 列到 100k 记录中的每一个。在这种情况下,有 10 个线程和 100k 条记录,因此 10k 条记录将分配一个 id - 我正在尝试实现一个 stagingsteplistener 来执行此操作。

  5. 为此暂存表编写了一个阅读器。任务执行者将创建 10 个读者,每个读者必须读取 300 条不同的记录并处理它们——现在我如何传递一个公共(public) ID 在步骤监听器和读取器之间,以便每个线程都有 它自己要处理的记录集。

到目前为止,我只有一个 JVM。因此,我正在考虑在多线程步骤本身中执行此操作,而不是考虑基于分区的方法。

请帮忙......

我引用了 pro spring batch book 并创建了一个暂存步骤监听器,它使用如下作业参数从作业配置 xml 中接受运行 id

<beans:bean id="stagingStepListener"
class="com.apress.springbatch.statement.listener.StagingStepListener" scope="step">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="tableName" value="transaction"/>
<beans:property name="whereClause"
value="where jobId is null and processed is null"/>
<beans:property name="jobId" value="#{jobParameters[run.id]}"/>
</beans:bean>

我找不到的是这个?这个“run.id”来自哪里。我在书中的任何地方都没有看到这一点。我在我的 spring 批处理中复制了相同的实现,当我运行它时,我看到异常说 run.id 不可识别。请帮助我了解如何执行此操作?

最佳答案


  • 我找不到的是这个?这个“run.id”是从哪里来的

作业参数

这只是传递给 jobParameters 的一个参数。通常每个实例使用不同的 run.id(常规名称),因为框架无法知道对 JobParameters 的哪些更改使其成为“下一个”作业实例。

您可以将此“run.id”传递给 jobParameters:

new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()

看看 JobParametersIncrementerdocumentation了解更多详情。


  • 我如何在步骤监听器和读取器之间传递公共(public) ID,以便每个线程都有自己的一组记录来处理

不要

这是一条非常危险的路线,因为 Step 中的许多参与者(例如读者和作者)都是有状态的,如果状态没有按线程隔离,那么这些组件在多线程 Step 中是不可用的。特别是 Spring Batch 中的大多数现成的读取器和写入器并不是为多线程使用而设计的。

分区

我建议使用 Partitioning .它比看起来简单得多,您仍然可以为其使用多个线程。查看使用 partitioning 的示例批处理作业,来自“Spring Batch samples”是为了:

使用 PartitionHandler SPI 显示多线程步骤执行。该示例使用 TaskExecutorPartitionHandler 来分散读取某些文件的工作跨多个线程,每个线程执行一个 Step。关键组件是负责划分工作的 PartitionStep 和 MultiResourcePartitioner。请注意,被分区的步骤中的读取器和写入器是步骤范围内的,因此它们的状态不会在执行线程之间共享。

关于java - Spring Batch 多线程 - 如何让每个线程读取唯一记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9087691/

相关文章:

java - Web 元素之间的等待时间 [Webdriverwait Selenium]

android - 试图获取下载文件大小,但出现错误

python-3.x - 在threading.thread中,为什么args末尾带逗号

java - Spring MVC htmlspecialchars() 替代方案

java - 带有@Loggable 注释的方法从不打印方面日志

java - @RequestMapping 在 Maven War 覆盖上的奇怪行为

java - Eclipse 中的 Storm 集群关闭

java - SSLSocketFactory 忽略服务器证书

java - 递归语句调用另一个递归语句?

python - Python 中的计时器对象