java - 使用 ClassifierCompositeItemWriter 的 Spring Batch java 配置错误

标签 java spring spring-batch

我正在使用带有java配置的Spring Batch(这是新的),并且在尝试使用ClassifierCompositeItemWriter时遇到错误,因此根据分类器生成单独的文件。

我得到的错误是org.springframework.batch.item.WriterNotOpenException:Writer必须先打开才能写入

我的配置如下:

    package com.infonova.btcompute.batch.geneva.job;

import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusFinishedJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusInprogressJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTransferStatus;
import com.infonova.btcompute.batch.geneva.camel.GenevaJobLauncher;
import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.geneva.dto.GenveaDetailsTransactionDto;
import com.infonova.btcompute.batch.geneva.properties.GenevaDetailsExportJobProperties;
import com.infonova.btcompute.batch.geneva.rowmapper.GenevaDetailsTransactionsRowMapper;
import com.infonova.btcompute.batch.geneva.steps.*;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import com.infonova.product.batch.camel.CamelEnabledJob;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.classify.BackToBackPatternClassifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


public abstract class AbstractGenevaDetailsExportJob extends CamelEnabledJob {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGenevaDetailsExportJob.class);

    @Autowired
    protected JobBuilderFactory jobBuilders;

    @Autowired
    protected StepBuilderFactory stepBuilders;

    @Autowired
    protected DataSource datasource;

    @Autowired
    private BillrunTransferStatusMapper billrunTransferStatusMapper;

    @Autowired
    protected JdbcTemplate jdbcTemplate;


    public abstract GenevaDetailsExportJobProperties jobProperties();

    @Bean
    public RouteBuilder routeBuilder(final GenevaDetailsExportJobProperties jobProperties, final Job job) {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(jobProperties.getConsumer())
                        .transacted("PROPAGATION_REQUIRED")
                        .routeId(jobProperties.getInputRouteName())
                        .process(genevaJobLauncher(job));
                        //.to("ftp://app@127.0.0.1?password=secret");
            }
        };
    }

    @Bean
    public Processor genevaJobLauncher(Job job) {
        return new GenevaJobLauncher(job);
    }

    @Bean
    @StepScope
    public GenevaDetailsReader reader() {
        GenevaDetailsReader reader = new GenevaDetailsReader(jobProperties().getMandatorKey(),
                jobProperties().getInvoiceType(), jobProperties().getSqlResourcePath());
        reader.setSql("");
        reader.setDataSource(datasource);
        reader.setRowMapper(new GenevaDetailsTransactionsRowMapper());
        reader.setFetchSize(jobProperties().getFetchSize());
        return reader;
    }

    @Bean
    @StepScope
    public GenevaDetailsItemProcessor processor() {
        return new GenevaDetailsItemProcessor();
    }

    @Bean
    @StepScope
    public ClassifierCompositeItemWriter writer() {

        List<String> serviceCodes = new ArrayList<>();//billrunTransferStatusMapper.getServiceCodes(jobProperties().getMandatorKey());
        Long billingTaskId = billrunTransferStatusMapper.getCurrentTaskId(jobProperties().getMandatorKey());
        String countryKey = billrunTransferStatusMapper.getCountryKey(billingTaskId);
        serviceCodes.add("BTCC");
        serviceCodes.add("CCMS");

        BackToBackPatternClassifier classifier = new BackToBackPatternClassifier();
        classifier.setRouterDelegate(new GenveaDetailsRouterClassifier());

        HashMap<String, Object> map = new HashMap<>();

        for (String serviceCode : serviceCodes) {
            map.put(serviceCode, genevaDetailsWriter(serviceCode, countryKey));
        }

        classifier.setMatcherMap(map);
        ClassifierCompositeItemWriter<GenveaDetailsTransactionDto> writer = new ClassifierCompositeItemWriter<>();
        writer.setClassifier(classifier);
        return writer;

    }


    @Bean
    @StepScope
    public GenevaDetailsFlatFileItemWriter genevaDetailsWriter(String serviceCode, String countryKey) {
        GenevaDetailsFlatFileItemWriter writer = new GenevaDetailsFlatFileItemWriter(jobProperties().getDelimiter());

        FileNameGeneration fileNameGeneration = new FileNameGeneration();

        try {
            FileSystemResource fileSystemResource = new FileSystemResource(new File(jobProperties().getExportDir(), fileNameGeneration.generateFileName(jdbcTemplate,
                    serviceCode, countryKey)));
            writer.setResource(fileSystemResource);
        } catch (SQLException e) {
            LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
        }
        return writer;
    }

    @Bean
    public Job job() {
        return jobBuilders.get(jobProperties().getJobName())
                .start(setBillRunTransferStatusDetailInprogressStep())
                .next(processGenevaDetailsStep())
                .next(setBillRunTransferStatusProcessedStep())
                .build();
    }

    @Bean
    public Step setBillRunTransferStatusDetailInprogressStep() {
        return stepBuilders.get("setBillRunTransferStatusDetailInprogressStep")
                .tasklet(setBillRunTransferStatusDetailInprogress())
                .build();
    }

    @Bean
    public Tasklet setBillRunTransferStatusDetailInprogress() {
        return new BillRunTranStatusInprogressJobAssignment(BillRunTransferStatus.SUMMARY.toString(), BillRunTransferStatus.DETAILS_INPROGRESS.toString(),
                jobProperties().getMandatorKey(), jobProperties().getInvoiceTypeNum(), jobProperties().getReportTypeNum());
    }

    @Bean
    public Step setBillRunTransferStatusProcessedStep() {
        return stepBuilders.get("setBillRunTransferStatusProcessedStep")
                .tasklet(setBillRunTransferStatusProcessed())
                .build();
    }

    @Bean
    public Tasklet setBillRunTransferStatusProcessed() {
        return new BillRunTranStatusFinishedJobAssignment(BillRunTransferStatus.PROCESSED.toString());
    }

    @Bean
    public Step processGenevaDetailsStep() {
        return stepBuilders.get("processGenevaDetailsStep")
                .<GenveaDetailsTransactionDto, GenevaDetailsResultsDto>chunk(jobProperties().getChunkSize())
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

}

我的作家看起来像:

package com.infonova.btcompute.batch.geneva.steps;

import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.*;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;

@Component
public class GenevaDetailsFlatFileItemWriter extends FlatFileItemWriter<GenevaDetailsResultsDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenevaDetailsFlatFileItemWriter.class);

    @Autowired
    protected JdbcTemplate jdbcTemplate;

    @Autowired
    private BillrunTransferStatusMapper billrunTransferStatusMapper;


    private String delimiter;


    public GenevaDetailsFlatFileItemWriter(String delimiter) {
        this.delimiter = delimiter;
        this.setLineAggregator(getLineAggregator());
        this.setHeaderCallback(getHeaderCallback());
    }

    private DelimitedLineAggregator<GenevaDetailsResultsDto> getLineAggregator() {
        DelimitedLineAggregator<GenevaDetailsResultsDto> delLineAgg = new DelimitedLineAggregator<>();
        delLineAgg.setDelimiter(delimiter);

        BeanWrapperFieldExtractor<GenevaDetailsResultsDto> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(getNames());
        delLineAgg.setFieldExtractor(fieldExtractor);

        return delLineAgg;
    }

    private String[] getHeaderNames() {
        return new String[] {"Record ID", "Service Identifier", "Billing Account Reference", "Cost Description", "Event Cost",
                "Event Date and Time", "Currency Code", "Charge Category", "Order Identifier", "Net Usage", "UOM",
                "Quantity", "Service Start Date", "Service End Date"};
    }

    private String[] getNames() {
        return new String[] {"RECORD_ID", "SERVICE_CODE", "BILLING_ACCOUNT_REFERENCE", "COST_DESCRIPTION", "EVENT_COST",
                "EVENT_DATE_AND_TIME", "CURRENCY_CODE", "CHARGE_CATEGORY", "ORDER_IDENTIFIER", "NET_USAGE", "UOM",
                "QUANTITY", "SERVICE_START_DATE", "SERVICE_END_DATE"};
    }



    private FlatFileHeaderCallback getHeaderCallback()
    {
        return new FlatFileHeaderCallback() {
            @Override
            public void writeHeader(Writer writer) throws IOException {
                writer.write(String.join(delimiter, getHeaderNames()));
            }
        };
    }

//    @BeforeStep
//    public void beforeStep(StepExecution stepExecution) {
//        billingTaskId = (Long) stepExecution.getJobExecution().getExecutionContext().get("billingTaskId");
//        FileNameGeneration fileNameGeneration = new FileNameGeneration();
//
//        try {
//            FileSystemResource fileSystemResource = new FileSystemResource(new File(exportDir, fileNameGeneration.generateFileName(jdbcTemplate,
//                    serviceCode, billrunTransferStatusMapper.getCountryKey(billingTaskId))));
//            setResource(fileSystemResource);
//        } catch (SQLException e) {
//            LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
//        }
//    }
}

我在网上搜索过,没有找到该问题的解决方案。

最佳答案

@Hansjoerg Wingeier 所写的关于 ClassifierCompositeItemWriter 的内容是正确的,但解决问题的正确方法是使用 AbstractTaskletStepBuilder.stream() 将委托(delegate)编写器注册为流。让 SB 管理执行上下文生命周期。

关于java - 使用 ClassifierCompositeItemWriter 的 Spring Batch java 配置错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39229496/

相关文章:

java - 添加两个数字而不是合并

java - 修复 PMD "CollapsibleIfStatements"违规后对代码性能有影响吗?

spring - 如何从 application.properties 中读取并在 Spring Batch 中设置为作业参数

spring - 将动态资源注入(inject) Spring Batch ItemReader

java - 主类运行具有依赖项的可执行 jar 时出现 ClassNotFoundException

java - 谁能解释一下 java 是如何设计 HashMap 的 hash() 函数的?

angular - Spring Boot 安全性未加载 Angular 7 应用程序构建文件

java - 如何在 spring jdbctemplate 中编写查询来获取任意条件的多行?

java - 测试基于注解的 RequestInterceptor

java - Spring Batch 中的 JMS 读取器导致 NoClassDefFoundError