给定一个使用分区的 Spring Batch 作业:
<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportingJobExecutionListenerr" />
</batch:listeners>
<batch:step id="reportingMasterStep">
<partition step="reportingSlaveStep"
partitioner="reportingPartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor" />
</partition>
</batch:step>
</job>
和reportingSlaveStep
定义为:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" />
</step>
和reportingSlaveJob
定义为:
<job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportsOutputListener" />
</batch:listeners>
<batch:split id="reportsCreationSplit"
task-executor="taskExecutor">
<batch:flow>
<batch:step id="basicReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="basicReportReader"
writer="basicReportWriter" commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="advancedReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
</batch:split>
</job>
我现在有 2 个问题:
- 我希望为每个分区创建一个新的
reportsOutputListener
实例。我可以通过将reportsOutputListener
设为Step
作用域 bean 来实现此目的吗? - 我希望能够访问为
reportingJob
创建的相同jobExecutionContext
,以便在reportingSlaveJob
中访问。我是否需要对此进行任何特殊处理,或者reportingSlaveStepSlaveJob
也使用reportingJob
中的相同jobExecutionContext
实例吗? - 编辑:当我运行上述作业时,有时会出现异常,提示“此作业的作业执行已在运行”,有时会出现
NullPointerException
在MapExecutionContextDao.java:130
上。
编辑:另请注意,对于第 2 点,slaveJob
无法访问 stepExecutionContext
中添加的值(使用 >#{stepExecutionContext['msbfBatchId']}
(在 Spring 配置 xml 中)由 reportingPartitioner
执行。针对该键的 stepExecutionContext
中的值显示为 null
。
最佳答案
I want a new reportsOutputListener instance to be created for each partition. Can I achieve this by making reportsOutputListener a Step scoped bean?
答案是是。 (正如 Mahmoud Ben Hassine 的评论中提到的)
I want to be able to access the same jobExecutionContext created for reportingJob to be accessible in reportingSlaveJob. Do I need to any special handling for this or is the same jobExecutionContext instance in reportingJob is used by the reportingSlaveStepSlaveJob as well?
答案是否。我深入研究了 Spring Batch 代码并发现 JobStep
使用 JobParametersExtractor
用于复制 stepExecutionContext
中的值到JobParameters
。这意味着reportingSlaveJob
可以从 JobParameters
访问这些值而不是 StepExecutionContext
。也就是说,由于某种原因,DefaultJobParametersExtractor
Srping Batch 3.0 中的实现似乎并未将值复制到 jobParameters
正如预期的那样。我最终编写了以下自定义提取器:
public class CustomJobParametersExtractor implements JobParametersExtractor {
private Set<String> keys;
public CustomJobParametersExtractor () {
this.keys = new HashSet<>();
}
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
JobParametersBuilder builder = new JobParametersBuilder();
Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
// copy job parameters from parent job to delegate job
for (String key : jobParameters.keySet()) {
builder.addParameter(key, jobParameters.get(key));
}
// copy job/step context from parent job/step to delegate job
for (String key : keys) {
if (jobExecutionContext.containsKey(key)) {
builder.addString(key, jobExecutionContext.getString(key));
} else if (stepExecutionContext.containsKey(key)) {
builder.addString(key, stepExecutionContext.getString(key));
} else if (jobParameters.containsKey(key)) {
builder.addString(key, (String) jobParameters.get(key).getValue());
}
}
return builder.toJobParameters();
}
public void setKeys(String[] keys) {
this.keys = new HashSet<>(Arrays.asList(keys));
}
}
然后我可以在报告从属步骤中使用上述提取器,如下所示:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>
哪里customJobParametersExtractor
是 CustomJobParametersExtractor
类型的 bean它传递了我们想要复制到 JobParameters
的所有 key 。的reportingSlaveJob
。
When I run the above job, at times I get an exception saying that the "A job execution for this job is already running" and other times I get a NullPointerException on MapExecutionContextDao.java:130
发生这种情况的原因是因为没有我的CustomJobParameterExtractor
,reportingSlaveJob
正在以空 JobParameters
启动。为了让 Spring Batch 创建新的作业实例,每次启动 reportingSlaveJob
时作业参数必须不同。 。使用CustomJobParameterExtractor
也解决了这个问题。
关于java - Spring 批处理3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58947946/