java - Spring 批处理3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job

标签 java spring-batch listener partitioning

给定一个使用分区的 Spring Batch 作业:

<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
        <batch:listeners>
            <batch:listener ref="reportingJobExecutionListenerr" />
        </batch:listeners>
        <batch:step id="reportingMasterStep">
            <partition step="reportingSlaveStep"
                partitioner="reportingPartitioner">
                <batch:handler grid-size="10" task-executor="taskExecutor" />
            </partition>
        </batch:step>
</job>

reportingSlaveStep定义为:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
        <job ref="reportingSlaveJob" />
</step>

reportingSlaveJob定义为:

    <job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
        <batch:listeners>
            <batch:listener ref="reportsOutputListener" />
        </batch:listeners>
        <batch:split id="reportsCreationSplit"
            task-executor="taskExecutor">
            <batch:flow>
                <batch:step id="basicReportStep">
                    <tasklet throttle-limit="5" task-executor="taskExecutor">
                        <batch:chunk reader="basicReportReader"
                            writer="basicReportWriter" commit-interval="500" />
                    </tasklet>
                </batch:step>
            </batch:flow>
            <batch:flow>
                <batch:step id="advancedReportStep">
                    <tasklet throttle-limit="5" task-executor="taskExecutor">
                        <batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
                            commit-interval="500" />
                    </tasklet>
                </batch:step>
            </batch:flow>
       </batch:split>
     </job>

我现在有 2 个问题:

  1. 我希望为每个分区创建一个新的 reportsOutputListener 实例。我可以通过将 reportsOutputListener 设为 Step 作用域 bean 来实现此目的吗?
  2. 我希望能够访问为 reportingJob 创建的相同 jobExecutionContext,以便在 reportingSlaveJob 中访问。我是否需要对此进行任何特殊处理,或者 reportingSlaveStepSlaveJob 也使用 reportingJob 中的相同 jobExecutionContext 实例吗?
  3. 编辑:当我运行上述作业时,有时会出现异常,提示“此作业的作业执行已在运行”,有时会出现 NullPointerExceptionMapExecutionContextDao.java:130 上。

编辑:另请注意,对于第 2 点,slaveJob 无法访问 stepExecutionContext 中添加的值(使用 >#{stepExecutionContext['msbfBatchId']}(在 Spring 配置 xml 中)由 reportingPartitioner 执行。针对该键的 stepExecutionContext 中的值显示为 null

最佳答案

I want a new reportsOutputListener instance to be created for each partition. Can I achieve this by making reportsOutputListener a Step scoped bean?

答案是。 (正如 Mahmoud Ben Hassine 的评论中提到的)

I want to be able to access the same jobExecutionContext created for reportingJob to be accessible in reportingSlaveJob. Do I need to any special handling for this or is the same jobExecutionContext instance in reportingJob is used by the reportingSlaveStepSlaveJob as well?

答案是。我深入研究了 Spring Batch 代码并发现 JobStep使用 JobParametersExtractor用于复制 stepExecutionContext 中的值到JobParameters 。这意味着reportingSlaveJob可以从 JobParameters 访问这些值而不是 StepExecutionContext 。也就是说,由于某种原因,DefaultJobParametersExtractor Srping Batch 3.0 中的实现似乎并未将值复制到 jobParameters正如预期的那样。我最终编写了以下自定义提取器:

public class CustomJobParametersExtractor implements JobParametersExtractor {

    private Set<String> keys;

    public CustomJobParametersExtractor () {
        this.keys = new HashSet<>();
    }

    @Override
    public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
        JobParametersBuilder builder = new JobParametersBuilder();
        Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
        ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
        ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();

        // copy job parameters from parent job to delegate job
        for (String key : jobParameters.keySet()) {
            builder.addParameter(key, jobParameters.get(key));
        }

        // copy job/step context from parent job/step to delegate job
        for (String key : keys) {
            if (jobExecutionContext.containsKey(key)) {
                builder.addString(key, jobExecutionContext.getString(key));
            } else if (stepExecutionContext.containsKey(key)) {
                builder.addString(key, stepExecutionContext.getString(key));
            } else if (jobParameters.containsKey(key)) {
                builder.addString(key, (String) jobParameters.get(key).getValue());
            }
        }
        return builder.toJobParameters();
    }

    public void setKeys(String[] keys) {
        this.keys = new HashSet<>(Arrays.asList(keys));
    }

}

然后我可以在报告从属步骤中使用上述提取器,如下所示:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
        <job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>

哪里customJobParametersExtractorCustomJobParametersExtractor 类型的 bean它传递了我们想要复制到 JobParameters 的所有 key 。的reportingSlaveJob

When I run the above job, at times I get an exception saying that the "A job execution for this job is already running" and other times I get a NullPointerException on MapExecutionContextDao.java:130

发生这种情况的原因是因为没有我的CustomJobParameterExtractorreportingSlaveJob正在以空 JobParameters 启动。为了让 Spring Batch 创建新的作业实例,每次启动 reportingSlaveJob 时作业参数必须不同。 。使用CustomJobParameterExtractor也解决了这个问题。

关于java - Spring 批处理3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58947946/

相关文章:

Android AChartEngine graphicalView 分享事件

android - 如何连接 2 个不同的微调器

java.lang.NoClassDefFoundError : com/google/common/collect/ImmutableMap error using GeckoDriver Firefox through Selenium in Java

java - 使用 spring security 进行身份验证 - 预身份验证场景

java - BorderLayout West Center East all 等宽

java - Spring Batch Job的集成测试失败

java - NoClassDefFoundError:org/jagatoo/input/listeners/InputListener

java - 在分离的 Java 平台模块中使用不同版本的依赖项

具有 Multi-Tenancy 的 Spring Batch

spring - 使用 Spring Batch 分区处理海量数据