java - Spring Batch - 循环读取器/处理器/写入器步骤

标签 java spring spring-batch

回答

根据接受的答案代码,对该代码进行的以下调整对我有用:

// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(steps.size());         
        
    Flow[] flows = new Flow[steps.size()];
    for (int i = 0; i < steps.size(); i++) {
        flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
    }           

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
        .split(taskExecutor)                
        .add(flows)
        .build();
}

编辑

我已将问题更新为正确循环的版本,但随着应用程序的扩展,能够并行处理很重要,但我仍然不知道如何在运行时动态地使用 java-config 来做到这一点。 ..

改进的问题:我如何在运行时动态地创建读取器-处理器-写入器,例如 5 种不同的情况(5 个查询意味着 5 个循环,因为它现在已配置)?

我的 LoopDecider 看起来像这样:

public class LoopDecider implements JobExecutionDecider {
    
    private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
    private static final String COMPLETED = "COMPLETED";
    private static final String CONTINUE = "CONTINUE";
    private static final String ALL = "queries";
    private static final String COUNT = "count";
    
    private int currentQuery;
    private int limit;

    @SuppressWarnings("unchecked")
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
        this.limit = allQueries.size();
        jobExecution.getExecutionContext().put(COUNT, currentQuery);
        if (++currentQuery >= limit) {
            return new FlowExecutionStatus(COMPLETED);
        } else {
            LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
            return new FlowExecutionStatus(CONTINUE);
        }       
    }

}

基于查询列表(HQL 查询),我希望每个查询都有一个读取器 - 处理器 - 编写器。我当前的配置如下所示:

工作

@Bean
public Job subsetJob() throws Exception {
    LoopDecider loopDecider = new LoopDecider();        
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
    Flow flow = flowBuilder
            .start(createHQL())
            .next(extractData())
            .next(loopDecider)
            .on("CONTINUE")
            .to(extractData())
            .from(loopDecider)
            .on("COMPLETED")                
            .end()
            .build();       
    
    return jobBuilderFactory.get("subsetJob")               
            .start(flow)                
            .end()
            .build();
}

步骤

public Step extractData(){
    return stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

阅读器

public HibernateCursorItemReader reader(){      
    CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
    reader.setSessionFactory(HibernateUtil.getSessionFactory());        
    reader.setUseStatelessSession(false);
    return reader;
}

处理器

public DynamicRecordProcessor processor(){
    return new DynamicRecordProcessor();
}

作者

public FlatFileItemWriter writer(){
    CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();               
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );
    return writer;
}

目前,该过程适用于单个查询。但是,我实际上有一个查询列表。

我最初的想法是循环该步骤并将查询列表传递给该步骤,并为每个查询读取 - 处理 - 写入。这也是并行分块的理想选择。

但是,当我将查询列表作为参数添加到提取数据步骤并为每个查询创建一个步骤时,将返回一个步骤列表,而不是预期的单个步骤。作业开始提示它需要一个步骤而不是一系列步骤。

另一个想法是创建一个自定义的 MultiHibernateCursorItemReader,其想法与 MultiItemResourceReader 相同,但是,我真的在寻找一个更开箱即用的解决方案。

@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
    List<Step> steps = new ArrayList<Step>();
    for (String query : queries) {
        steps.add(stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader(query))
            .processor(processor())
            .writer(writer(query))
            .build());
    }
    return steps;
}

问题
如何循环步骤并将其集成到作业中?

最佳答案

不要将您的步骤、读取器、处理器和写入器实例化为 Spring-Beans。没有必要这样做。只有您的作业实例必须是 Spring Bean。

因此只需从您的步骤、阅读器、编写器和处理器创建器方法中删除@Bean 和@StepScope 配置,并在需要的地方实例化它们。

只有一个问题,您必须手动调用 afterPropertiesSet()。例如:

// @Bean -> delete
// @StepScope -> delete
public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
    FlatFileItemWriter writer = new FlatFileItemWriter();
    writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));       
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );

    // ------- ADD!!
    writer.afterPropertiesSet();

    return writer;
}

这样,您的步骤、读取器、写入器实例将自动“确定步骤范围”,因为您为每个步骤显式实例化了它们。

如果我的回答不够清楚,请告诉我。然后我将添加一个更详细的示例。

编辑

一个简单的例子:

@Configuration
public class MyJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    List<String> filenames = Arrays.asList("file1.txt", "file2.txt");

    @Bean
    public Job myJob() {

       List<Step> steps = filenames.stream().map(name -> createStep(filename));

       return jobBuilderFactory.get("subsetJob")               
            .start(createParallelFlow(steps));                
            .end()
            .build();
    }


    // helper method to create a step
    private Step createStep(String filename) {
    {
        return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
            .chunk(100_000)
            .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
            .processor(new YourConversionProcessor());
            .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
            .build();
    }


    // helper method to create a split flow out of a List of steps
    private static Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream() // we have to convert the steps to a flows
            .map(step -> //
                    new FlowBuilder<Flow>("flow_" + step.getName()) //
                    .start(step) //
                    .build()) //
            .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
            .add(flows.toArray(new Flow[flows.size()])) //
            .build();
    }


    // helper methods to create filereader and filewriters
    public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
        FlatFileItemReader<T> reader = new FlatFileItemReader<>();

        reader.setEncoding("UTF-8");
        reader.setResource(source);
        reader.setLineMapper(lineMapper);
        reader.afterPropertiesSet();

        return reader;
    }

    public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
        FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();

        writer.setEncoding("UTF-8");
        writer.setResource(target);
        writer.setLineAggregator(aggregator);

        writer.afterPropertiesSet();
        return writer;
    }
}

关于java - Spring Batch - 循环读取器/处理器/写入器步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37238813/

相关文章:

java - Mokito 不返回我的值实例调用数据库

java - @Resource 注释由 spring 和应用程序服务器拾取

java - Hibernate 4 升级 (java.lang.NoSuchMethodError : javax. persistence.spi.PersistenceUnitInfo.getValidationMode())

java - Spring 批处理 JobRepository 位置和缩放

java - JDBC中如何使用 "set @variable = 0"

java - Java 中有什么方法可以强制执行最小窗口大小吗?

java - 如何为 Spring XD 配置 Spring InboundChannelAdapter?

sql - JdbcBatchItemWriter 中的更新语句

java - JSR 352 分区 block 处理

java - 通过 zuul 访问时,Spring session 作用域 bean 在请求之间重置