我正在实现 spring 批处理作业以使用分区方法处理数据库表中的数百万条记录,如下所示 -
这种方法是否合适,或者对于这种情况有更好的方法吗?由于某些分区代码可以比其他分区代码具有更多的记录数,因此具有更多记录的分区代码可能比具有较少记录的分区代码需要更多的时间来处理。
是否可以创建分区/线程来处理线程 1 进程 1-1000、线程 2 进程 1001-2000 等?
我如何控制创建的线程数,因为分区代码可以在 100 左右,我只想在 5 次迭代中创建 20 个线程和进程?
如果一个分区发生故障会发生什么,所有处理都会停止并恢复吗?
以下是配置——
<bean id="MyPartitioner" class="com.MyPartitioner" />
<bean id="itemProcessor" class="com.MyProcessor" scope="step" />
<bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
<property name="rowMapper">
<bean class="com.MyRowMapper" scope="step"/>
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="20"/>
<property name="maxPoolSize" value="20"/>
<property name="allowCoreThreadTimeOut" value="true"/>
</bean>
<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
</batch:tasklet>
</batch:step>
<batch:job id="myjob">
<batch:step id="mystep">
<batch:partition step="Step1" partitioner="MyPartitioner">
<batch:handler grid-size="20" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
分区器 -
public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();
for (String code : codes)
{
ExecutionContext context = new ExecutionContext();
context.put("code", code);
partitionMap.put(code, context);
}
return partitionMap;}}
谢谢
最佳答案
我会说这是正确的方法,我不明白为什么你需要每 1000 个项目有一个线程,如果你按照唯一的分区代码进行分区并且有 1000 个项目的块,你将在每个线程的 1000 个项目上进行交易,这是 IMO 好的。
你有很多每个分区的代码和分区甚至更多,通过
为每 1000 个相同的分区代码创建新的子上下文(即
具有即 2200 条记录的分区代码的方式,您将调用 3
具有上下文参数的线程:1=> partition_key=key1,skip=0,
count=1000, 2=>partition_key=key1, skip=1000, count=1000 和
3=>partition_key=key1, skip=2000, count=1000) 如果这是你
想要,但没有它我还是会去
ThreadPoolTaskExecutor
控制创建它时传递给分区步骤。你有方法 setCorePoolSize()
您可以将其设置为 20,您将获得最多 20 个线程。下一个细粒度配置是 grid-size
它告诉我们将从完整的分区映射中创建多少个分区。这是explanation of grid size .所以分区就是划分工作。之后,您的线程配置将定义实际处理的并发性。 希望我能回答你所有的问题,因为有很多。
案例 1 的示例- 也许有错误,但只是为了得到想法:
public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
Map<String, int> codesWithCounts = getCodesWithCounts();
for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
{
for (int i = 0; i < codeWithCount.getValue(); i + 1000){
ExecutionContext context = new ExecutionContext();
context.put("code", code);
context.put("skip", i);
context.put("count", 1000);
partitionMap.put(code, context);
}
}
return partitionMap;
}
Adn 比你翻页 1000,你从上下文中得到你应该跳过多少,在 2200 的例子中是:0, 1000, 2000。
关于spring - 使用 Spring Batch 分区处理海量数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29226350/