spring-boot - 需要一种方法来防止不需要的作业参数传播到下一次执行 spring boot 批处理作业

标签 spring-boot spring-batch

我正在使用 spring boot 2.1.2 和 spring batch 4.1.1 运行批处理应用程序。该应用程序使用 MySQL 数据库作为 Spring Batch 元数据数据源。

首先,我使用以下命令运行作业:

java -jar target/batchdemo-0.0.1-SNAPSHOT.jar -Dspring.batch.job.names=echo com.paypal.batch.batchdemo.BatchdemoApplication myparam1=value1 myparam2=value2

注意我传递了两个参数:

myparam1=value1 myparam2=value2

由于作业使用 RunIdIncrementer,因此应用程序使用的实际参数记录为:

Job: [SimpleJob: [name=echo]] completed with the following parameters: [{myparam2=value2, run.id=1, myparam1=value1}]

接下来我再次运行该作业,这次删除 myparam2:

java -jar target/batchdemo-0.0.1-SNAPSHOT.jar -Dspring.batch.job.names=echo com.paypal.batch.batchdemo.BatchdemoApplication myparam1=value1

这次作业再次运行时仍然包含 param2:

Job: [SimpleJob: [name=echo]] completed with the following parameters: [{myparam2=value2, run.id=2, myparam1=value1}]

这会导致调用业务逻辑,就像我再次将 myparam2 传递给应用程序一样。

有没有办法删除作业参数并且不将其传递给下一个实例?

应用代码:

package com.paypal.batch.batchdemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class BatchdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchdemoApplication.class, args);
    }

    @Autowired
    JobBuilderFactory jobBuilder;

    @Autowired
    StepBuilderFactory stepBuilder;

    @Autowired
    ParamEchoTasklet paramEchoTasklet;

    @Bean
    public RunIdIncrementer incrementer() {
        return new RunIdIncrementer();
    }

    @Bean
    public Job job() {
        return jobBuilder.get("echo").incrementer(incrementer()).start(echoParamsStep()).build();
    }

    @Bean
    public Step echoParamsStep() {
        return stepBuilder.get("echoParams").tasklet(paramEchoTasklet).build();
    }
}

package com.paypal.batch.batchdemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

@Component
public class ParamEchoTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        LOGGER.info("ParamEchoTasklet BEGIN");
        chunkContext.getStepContext().getJobParameters().entrySet().stream().forEachOrdered((entry) -> {
            String key = entry.getKey();
            Object value = entry.getValue();
            LOGGER.info("Param {} = {}", key, value);
        });
        LOGGER.info("ParamEchoTasklet END");
        return RepeatStatus.FINISHED;
    }

    private Logger LOGGER = LoggerFactory.getLogger(ParamEchoTasklet.class);
}

我调试了 spring batch 和 spring boot 代码,结果如下。 JobParametersBuilder line 273将最近的先前作业实例的参数与 JobParametersIncrementer 添加的任何参数一起添加到 nextParameters 映射:

List<JobExecution> previousExecutions = this.jobExplorer.getJobExecutions(lastInstances.get(0));
if (previousExecutions.isEmpty()) {
    // Normally this will not happen - an instance exists with no executions
    nextParameters = incrementer.getNext(new JobParameters());
}
else {
    JobExecution previousExecution = previousExecutions.get(0);
    nextParameters = incrementer.getNext(previousExecution.getJobParameters());
}

然后因为我使用的是 spring boot,JobLauncherCommandLineRunner line 213将先前的参数与为新执行传递的新参数合并,这导致旧参数被传递给新执行:

return merge(nextParameters, jobParameters);

如果没有参数,似乎不可能再次运行该作业,除非我遗漏了什么。会不会是 spring batch 的 bug?

最佳答案

RunIdIncrementer 的正常行为似乎增加了 JobExecution 的运行 ID,并传递了剩余的先验 JobParameters .我不会将此称为错误。

请记住,RunIdIncrementer 背后的想法只是更改一个标识参数以允许作业再次运行,即使之前使用相同(其他)参数的运行已成功完成并且重启尚未配置。

你总是可以通过实现 JobParametersIncrementer 创建一个定制的增量器.

另一种选择是使用 JobParametersBuilder构建一个 JobParameters 对象,然后使用 JobLauncher使用这些参数运行您的作业。如果我运行的作业具有相同的 JobParameters,我经常使用当前系统时间(以毫秒为单位)来创建唯一性。显然,您必须弄清楚从命令行(或其他任何地方)提取特定参数并迭代它们以填充 JobParameters 对象的逻辑。

例子:

public JobExecution executeJob(Job job) {
    JobExecution jobExecution = null;
    try {
        JobParameters jobParameters =
            new JobParametersBuilder()
                .addLong( "time.millis", System.currentTimeMillis(), true)
                .addString( "param1", "value1", true)
                .toJobParameters();
        jobExecution = jobLauncher.run( job, jobParameters );
    } catch ( JobInstanceAlreadyCompleteException | JobRestartException | JobParametersInvalidException | JobExecutionAlreadyRunningException e ) {
        e.printStackTrace();
    }
    return jobExecution;
}

关于spring-boot - 需要一种方法来防止不需要的作业参数传播到下一次执行 spring boot 批处理作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55750109/

相关文章:

spring - 在 Spring Boot 中读取文件的最佳方式

java - Spring Boot 应用程序执行器端点已注册且请求仍然失败

spring-mvc - Spring MVC 或 Spring Boot

java - Spring Batch+Boot 和 : [org/springframework/web/WebApplicationInitializer. 类]无法打开,因为它不存在

java - 在 Spring Batch 中在 ItemProcessor 之间传递数据的最佳方法?

java - SpringBatch itemReader 使用大量内存

java - 如何为 requestbody 发送自定义验证 Json 对象?

spring - 这个 ClientAbortException 是从哪里来的?

spring - 在 SQL Server Profiler 中显示 - 查询执行需要 1 毫秒,但在 Spring 应用程序中需要 30 毫秒,延迟在哪里?

spring-batch - 在作业中执行步骤时遇到错误