spring - 使用 Spring Batch 分区处理海量数据

标签 spring spring-batch

我正在实现 spring 批处理作业以使用分区方法处理数据库表中的数百万条记录,如下所示 -

  • 从分区器中的表中获取唯一的分区代码,并在执行上下文中设置相同的分区代码。
  • 使用读取器、处理器和写入器创建块步骤,以根据特定的分区代码处理记录。

  • 这种方法是否合适,或者对于这种情况有更好的方法吗?由于某些分区代码可以比其他分区代码具有更多的记录数,因此具有更多记录的分区代码可能比具有较少记录的分区代码需要更多的时间来处理。

    是否可以创建分区/线程来处理线程 1 进程 1-1000、线程 2 进程 1001-2000 等?

    我如何控制创建的线程数,因为分区代码可以在 100 左右,我只想在 5 次迭代中创建 20 个线程和进程?

    如果一个分区发生故障会发生什么,所有处理都会停止并恢复吗?

    以下是配置——
     <bean id="MyPartitioner" class="com.MyPartitioner" />
     <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
     <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
      <property name="dataSource" ref="dataSource"/>
      <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
      <property name="rowMapper">
          <bean class="com.MyRowMapper" scope="step"/>
      </property>
    </bean>
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <property name="corePoolSize" value="20"/>
        <property name="maxPoolSize" value="20"/>
        <property name="allowCoreThreadTimeOut" value="true"/>
    </bean>
    
    <batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
        </batch:tasklet>
    </batch:step>
    <batch:job id="myjob">
        <batch:step id="mystep">
            <batch:partition step="Step1" partitioner="MyPartitioner">
                <batch:handler grid-size="20" task-executor="taskExecutor"/>
            </batch:partition>
        </batch:step>
    </batch:job>
    

    分区器 -
    public class MyPartitioner implements Partitioner{
    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
    Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
    List<String> codes = getCodes();
    
    for (String code : codes)
    {
        ExecutionContext context = new ExecutionContext();
        context.put("code", code);
        partitionMap.put(code, context);
    }
    return partitionMap;}}
    

    谢谢

    最佳答案

    我会说这是正确的方法,我不明白为什么你需要每 1000 个项目有一个线程,如果你按照唯一的分区代码进行分区并且有 1000 个项目的块,你将在每个线程的 1000 个项目上进行交易,这是 IMO 好的。

  • 除了保存唯一的分区代码外,您还可以计算如何
    你有很多每个分区的代码和分区甚至更多,通过
    为每 1000 个相同的分区代码创建新的子上下文(即
    具有即 2200 条记录的分区代码的方式,您将调用 3
    具有上下文参数的线程:1=> partition_key=key1,skip=0,
    count=1000, 2=>partition_key=key1, skip=1000, count=1000 和
    3=>partition_key=key1, skip=2000, count=1000) 如果这是你
    想要,但没有它我还是会去
  • 线程数由 ThreadPoolTaskExecutor 控制创建它时传递给分区步骤。你有方法 setCorePoolSize()您可以将其设置为 20,您将获得最多 20 个线程。下一个细粒度配置是 grid-size它告诉我们将从完整的分区映射中创建多少个分区。这是explanation of grid size .所以分区就是划分工作。之后,您的线程配置将定义实际处理的并发性。
  • 如果一个分区失败,则整个分区步骤失败,并显示哪个分区失败的信息。成功分区已完成并且不会再次调用,当作业重新启动时,它将通过重做失败和未处理的分区来从停止的地方开始。

  • 希望我能回答你所有的问题,因为有很多。

    案例 1 的示例- 也许有错误,但只是为了得到想法:
    public class MyPartitioner implements Partitioner{
    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
        Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
        Map<String, int> codesWithCounts = getCodesWithCounts();
    
        for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
        {
            for (int i = 0; i < codeWithCount.getValue(); i + 1000){
                ExecutionContext context = new ExecutionContext();
                context.put("code", code);
                context.put("skip", i);
                context.put("count", 1000);
                partitionMap.put(code, context);
            }
        }
        return partitionMap;
    }
    

    Adn 比你翻页 1000,你从上下文中得到你应该跳过多少,在 2200 的例子中是:0, 1000, 2000。

    关于spring - 使用 Spring Batch 分区处理海量数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29226350/

    相关文章:

    java - Spring Batch 需要在一年内每周安排/运行一项作业

    linux - spring集成sftp入站 channel 适配器和临时文件问题

    java - 作业参数正在被缓存

    java - Spring事务与restful应用程序

    java - OptimisticLockingFailureException : Attempt to update step execution id=1 with wrong version (1),,其中 Spring Batch 代码中的当前版本为 2

    java - CORS 错误和带有 JWT token 的 403 响应

    Spring Boot 2.1.5 : Failed to replace {0} with Field Name on Validation Message

    java - Spring ldap 身份验证失败错误代码

    spring - 如何将命令行参数获取到 Spring Batch 作业中

    java - Spring-Batch如何在不同的数据库中写入作业元数据?