java - 在执行作业时检测到重复步骤

标签 java spring spring-batch

因此,我创建了一个示例批处理运行,以了解如何使作业重复进行,直到满足条件。在大多数情况下,它按预期工作,但我在执行程序时收到以下日志条目

org.springframework.batch.core.job.SimpleStepHandler:在执行作业时检测到重复步骤 [getStartingStep] = [infinateLoopJob]。如果任一步骤失败,都将在重新启动时再次执行。

虽然其他开发人员告诉我这没什么大不了的,但当我想依靠这些知识来编写生产代码时,我宁愿做正确的事,不要在路上发生任何奇怪的事情。我尝试将时间戳生成附加到步骤的名称,但没有成功(它只在初始创建步骤时附加了一次)。

所以简而言之,我怎样才能让每个步骤都不会一遍又一遍地使用相同的名称,并且仍然执行循环遍历整个作业的操作?

目前使用的代码:

AppStart.java

@SpringBootApplication(scanBasePackages="com.local.testJobLoop")
public class AppStart {

    private static final Logger logger = LoggerFactory.getLogger(com.local.testJobLoop.AppStart.class);

    AppStart() {
        super();
    }

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(AppStart.class);
        application.setWebEnvironment(false);
        ApplicationContext context = application.run(args);

        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job infinateLoopJob = context.getBean("infinateLoopJob", Job.class);

        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(infinateLoopJob, jobParameters);
            BatchStatus status = execution.getStatus();

            logger.debug("Exit Status : {}", status);
            if (!BatchStatus.COMPLETED.equals(status)) {
                List<Throwable> exceptions = execution.getAllFailureExceptions();
                for (Throwable throwable : exceptions) {
                    logger.error("Batch Failure Exceptions:", throwable);
                }
            }
        } catch (JobExecutionAlreadyRunningException e) {
            logger.error("Job execution already running:", e);
        } catch (JobRestartException e) {
            logger.error("Illegal attempt to restart a job:", e);
        } catch (JobInstanceAlreadyCompleteException e) {
            logger.error("Illegal attempt to restart a job that was already completed successfully", e);
        } catch (JobParametersInvalidException e) {
            logger.error("Invalid job parameter:", e);
        }
        logger.debug("Done");
    }
}

AppJobConfig.java

@Configuration
@EnableBatchProcessing
public class AppJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public Job infinateLoopJob(@Qualifier("getStartingStep") final Step startingStep,
                               @Qualifier("getSecondStep") final Step secondStep) {
        return jobBuilderFactory.get("infinateLoopJob")
                .start(startingStep).on(Constants.STEP_EXIT_STATUS_CONTINUE).to(secondStep)
                .from(startingStep).on(Constants.STEP_EXIT_STATUS_COMPLETED).end()
                .from(secondStep).next(startingStep).build()
                .build();

    }
}

AppStepConfig.java

@Configuration
@EnableBatchProcessing
public class AppStepConfig {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private static final Logger logger = LoggerFactory.getLogger(AppStepConfig.class);

    @Bean
    public Step getStartingStep(@Qualifier("startingActionTasklet") final StartingActionTasklet tasklet,
                                @Qualifier("startingActionListener") final StartingActionListener listener) {
        return stepBuilderFactory.get("getStartingStep")
                                 .tasklet(tasklet)
                                 .listener(listener)
                                 .build();
    }

    @Bean
    public Step getSecondStep(@Qualifier("secondActionTasklet") final SecondActionTasklet tasklet,
                              @Qualifier("secondActionListener") final SecondActionListener listener) {
        return stepBuilderFactory.get("getSecondStep")
                                 .tasklet(tasklet)
                                 .listener(listener)
                                 .build();
    }

StartingActionTasklet.java

@Component
public class StartingActionTasklet implements Tasklet, InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(StartingActionTasklet.class);

    public StartingActionTasklet() { super(); }

    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        int number = (int) chunkContext.getStepContext()
                                       .getStepExecution()
                                       .getJobExecution()
                                       .getExecutionContext()
                                       .get("incrementNumber");

        LOGGER.info("STARTING ACTION: Number is {number}");

        return RepeatStatus.FINISHED;
    }

    public void afterPropertiesSet() throws Exception { /* do nothing */ }
}

SecondActionTasklet.java

@Component
public class SecondActionTasklet implements Tasklet, InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecondActionTasklet.class);

    public SecondActionTasklet() { super(); }

    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        int number = (int) chunkContext.getStepContext()
                .getStepExecution()
                .getJobExecution()
                .getExecutionContext()
                .get("incrementNumber");

        LOGGER.info("SECOND ACTION: Number is " + number);

        number++;

        LOGGER.info("SECOND ACTION: Number is now " + number);

        chunkContext.getStepContext()
                .getStepExecution()
                .getJobExecution()
                .getExecutionContext()
                .put("incrementNumber", number);

        return RepeatStatus.FINISHED;

    }

    public void afterPropertiesSet() throws Exception {
        //do nothing
    }
}

StartingActionListener.java

@Component
public class StartingActionListener implements StepExecutionListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(StartingActionListener.class);

    public StartingActionListener() {
        super();
    }

    public void beforeStep(StepExecution stepExecution) {
        LOGGER.debug("StartingActionListener - beforeStep");

        // Get incrementNumber from job execution context
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        Integer incrementNumber = (Integer) jobContext.get("incrementNumber");

        if (incrementNumber == null) {
            jobContext.put("incrementNumber", 0);
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        LOGGER.debug("StartingActionListener - afterStep");

        // Get incrementNumber from job execution context
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        Integer incrementNumber = (Integer) jobContext.get("incrementNumber");

        // Continue job execution if have more feed, stop otherwise
        if (incrementNumber == null || incrementNumber > 10) {
            return new ExitStatus(Constants.STEP_EXIT_STATUS_COMPLETED);
        } else {
            return new ExitStatus(Constants.STEP_EXIT_STATUS_CONTINUE);
        }
    }
}

SecondActionListener.java

@Component
public class SecondActionListener implements StepExecutionListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecondActionListener.class);

    public void beforeStep(StepExecution stepExecution) {

    }

    public ExitStatus afterStep(StepExecution stepExecution) {
        LOGGER.debug("SecondActionListener - afterStep");

        return null;
    }
}

最佳答案

业务规则是否规定起始步骤第二步骤 需要分开且不同?如果没有,最简单的方法是将它们组合成一个步骤并利用 Repeat功能。

实际上,您的组合 Tasklet 只会返回 RepeatStatus.CONTINUABLE 直到所有工作完成,此时它会返回 RepeatStatus.FINISHED.

更新:所以你可以做的是向你的工作添加一个“decisionStepThree”,这会为工作添加新的步骤,尽管这会很恶心。

public class DecisionStepThreeTasklet implements Tasklet {

    @Autowired
    private SimpleJob job;

    @Autowired
    private Step startingStep;

    @Autowired
    private Step secondStep;

    public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) {
        Collection<StepExecution> stepExecutions = chunkContext.getStepContext().
                getStepExecution().getJobExecution().getStepExecutions();

        int stepCount = stepExecutions.size();
        StepExecution lastStepExecution = getLastStep(stepExecutions);

        if (Constants.STEP_EXIT_STATUS_CONTINUE.equals(lastStepExecution.getExitStatus().getExitCode())) {
            job.addStep(copyStep(startingStep, "startingStep" + ++stepCount));
            job.addStep(copyStep(secondStep, "secondStep" + ++stepCount));
        }
        return RepeatStatus.FINISHED;
    }

    public StepExecution getLastStep(final Collection<StepExecution> c) {
        Iterator<StepExecution> itr = c.iterator();
        StepExecution lastElement = itr.next();
        while(itr.hasNext()) {
            lastElement=itr.next();
        }
        return lastElement;
    }

    private Step copyStep(final Step parent, final String name) {
        return new Step() {
            public String getName() {
                return name;
            }
            public boolean isAllowStartIfComplete() {
                return parent.isAllowStartIfComplete();
            }
            public int getStartLimit() {
                return parent.getStartLimit();
            }
            public void execute(final StepExecution stepExecution) throws JobInterruptedException {
                parent.execute(stepExecution);
            }
        };
    }    
}

关于java - 在执行作业时检测到重复步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43875765/

相关文章:

java - Spring Batch ItemReader使用PostgreSQL函数游标错误

Java - 如何使 jButton# 索引采用 int 变量的值?

java - 如何在 Spring Boot 和 RabbitMQ 中配置和接收 jSON 有效载荷并将其转换为域对象

java - 同一项目,同一分支,两台机器: unit tests producing inconsistent results

java - Spring 和 Hibernate 的延迟加载

java - 如何在Spring Batch中读取多个CSV文件来合并数据进行处理?

Spring 批处理 : Conditional Flow

java.lang.ClassNotFoundException : org.omnifaces.exceptionhandler.FullAjaxExceptionHandlerFactory

java - Thymeleaf 不适用于驼峰类字段

java - tomcat 和 Apache 之间的适配器