服务器故障后 Spring Batch 恢复

标签 spring batch-processing

我正在使用spring批处理来解析文件,并且我有以下场景:

我正在做一份工作。这项工作必须解析一个给定的文件。由于意外原因(比如说停电),服务器出现故障,我必须重新启动机器。现在,重新启动服务器后,我想从停电前停止的点恢复工作。这意味着如果系统从 10.000 读取 1.300 行,现在必须从 1.301 行开始读取。

如何使用 Spring Batch 实现这种情况?

关于配置:我使用 spring-integration 在目录下轮询新文件。当文件到达时,spring-integration 会创建 spring 批处理作业。此外,spring-batch 使用 FlatFileItemReader 解析文件。

最佳答案

这是 JVM 崩溃后重新启 Action 业的完整解决方案。

  • 通过设置 restarable="true"
  • 使作业可重新启动

    job id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true"



    2.重新启 Action 业的代码
    import java.util.Date;
    import java.util.List;
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.batch.core.BatchStatus;
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobInstance;
    import org.springframework.batch.core.explore.JobExplorer;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.launch.JobOperator;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.beans.factory.annotation.Autowired;
    
    public class ResartJob {
    
        @Autowired
        private JobExplorer jobExplorer;
        @Autowired
        JobRepository jobRepository;
        @Autowired
        private JobLauncher jobLauncher;
        @Autowired 
        JobOperator jobOperator;
    
        public void restart(){
            try {
                List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
                if(CollectionUtils.isNotEmpty(jobInstances)){
                   JobInstance jobInstance =  jobInstances.get(0);
                   List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
                   if(CollectionUtils.isNotEmpty(jobExecutions)){
                       for(JobExecution execution: jobExecutions){
                           // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                           if(execution.getStatus().equals(BatchStatus.STARTED)){ 
                               execution.setEndTime(new Date());
                               execution.setStatus(BatchStatus.FAILED);                               
                               execution.setExitStatus(ExitStatus.FAILED);                               
                               jobRepository.update(execution);
                               jobOperator.restart(execution.getId());
                           }
                       }
                   }
                }
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }
    

    3.
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>
    
    <bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>
    
    
    <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />
    
    <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
            <property name="jobRepository" ref="jobRepository" />
            <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> 
    </bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
    
    <bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
    

    关于服务器故障后 Spring Batch 恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15224959/

    相关文章:

    java - angularjs剩余调用以json格式返回xml

    node.js - Node.js批处理

    ffmpeg - 如何使用 ffmpeg 转换整个目录?

    amazon-web-services - AWS BATCH - 如何运行更多并发作业

    batch-file - 如何批量传输重命名?

    java - Spring 无状态 Rest Web 服务

    java - 通过调用另一个bean的方法来创建bean

    java - Spring 计划任务未在 Windows 7 上执行

    java - Spring Boot数据源延迟初始化

    lua - 使用 ClassNLCriterion 在 Torch 中进行批处理