我正在使用spring批处理来解析文件,并且我有以下场景:
我正在做一份工作。这项工作必须解析一个给定的文件。由于意外原因(比如说停电),服务器出现故障,我必须重新启动机器。现在,重新启动服务器后,我想从停电前停止的点恢复工作。这意味着如果系统从 10.000 读取 1.300 行,现在必须从 1.301 行开始读取。
如何使用 Spring Batch 实现这种情况?
关于配置:我使用 spring-integration 在目录下轮询新文件。当文件到达时,spring-integration 会创建 spring 批处理作业。此外,spring-batch 使用 FlatFileItemReader 解析文件。
最佳答案
这是 JVM 崩溃后重新启 Action 业的完整解决方案。
job id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true"
2.重新启 Action 业的代码
import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
public class ResartJob {
@Autowired
private JobExplorer jobExplorer;
@Autowired
JobRepository jobRepository;
@Autowired
private JobLauncher jobLauncher;
@Autowired
JobOperator jobOperator;
public void restart(){
try {
List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
if(CollectionUtils.isNotEmpty(jobInstances)){
JobInstance jobInstance = jobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
if(CollectionUtils.isNotEmpty(jobExecutions)){
for(JobExecution execution: jobExecutions){
// If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
if(execution.getStatus().equals(BatchStatus.STARTED)){
execution.setEndTime(new Date());
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED);
jobRepository.update(execution);
jobOperator.restart(execution.getId());
}
}
}
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
3.
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>
<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>
<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
关于服务器故障后 Spring Batch 恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15224959/