java - ItemStreamException : Failed to initialize the reader Caused by:IllegalStateException: Stream is already initialized. 重新打开之前关闭

标签 java spring-boot spring-batch

我重复了 Creating a Batch Service tutorial 中提供的示例

但我想在工作中添加第二步,因此它应该执行以下操作:

第一步(教程中已提供):从 csv 中读取人名并将其以大写形式保存在数据库中

第二步:从数据库名称中读取,将其转换为小写并插入到另一个数据库表中。

目前我的配置如下:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    // tag::readerwriterprocessor[]
    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("users.csv"))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }


    @Bean
    public JdbcCursorItemReader<Person> dbReader(DataSource dataSource) {
        JdbcCursorItemReader<Person> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        BeanWrapperFieldSetMapper<Person> personBeanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<Person>() {{
            setTargetType(Person.class);
        }};
        reader.setRowMapper(new RowMapper<Person>() {
            @Override
            public Person mapRow(ResultSet resultSet, int i) throws SQLException {
                Person person = new Person();
                person.setFirstName(resultSet.getString(0));
                person.setLastName(resultSet.getString(1));
                return person;
            }
        });
        reader.setSql("SELECT first_name, last_name FROM people");
        reader.open(new ExecutionContext());
        return reader;
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public ToLowerCaseProcessor processor2() {
        return new ToLowerCaseProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .build();
    }
    // end::readerwriterprocessor[]

    // tag::jobstep[]
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
        return jobBuilderFactory.get("importUserJob")
                //.incrementer(new RunIdIncrementer())
                .listener(listener)
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_BEFORE!!!!!!!!!!!!!!!!");
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_AFTER!!!!!!!!!!!!!!!!");

                    }
                })
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

    @Bean
    public Step step2(JdbcBatchItemWriter<Person> writer, DataSource dataSource) {
        return stepBuilderFactory.get("step2")
                .<Person, Person>chunk(10)
                .reader(dbReader(dataSource))
                .processor(processor2())
                .writer(writer)
                .build();
    }
    // end::jobstep[]
}

但是当我在第二步启动 Spring Boot 应用程序时,我遇到了以下错误:

2019-08-02 14:33:54.164 ERROR 26972 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step step2 in job importUserJob

org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at com.sun.proxy.$Proxy44.run(Unknown Source) ~[na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:206) ~[spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:180) ~[spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:167) ~[spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:162) ~[spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:779) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:763) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at hello.Application.main(Application.java:10) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Stream is already initialized.  Close before re-opening.
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:424) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:149) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    ... 34 common frames omitted

我不知道我做错了什么。直播中提到了什么?
请解释一下如何纠正它?

最佳答案

在您的 dbReader 方法中,您不应调用:

reader.open(new ExecutionContext());

它是由springbatch调用的。

关于java - ItemStreamException : Failed to initialize the reader Caused by:IllegalStateException: Stream is already initialized. 重新打开之前关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57325850/

相关文章:

java - oauth2 与 Spring Boot 集成测试

带有 Spring 数据的 Spring 批处理

java - 如何在 spring-batch 中创建异步步骤?

spring-batch - Spring 批处理中的型材

java - 使用 Numbus L&F 设置 JTree 选择背景

Java 泛型——这个语法有什么用?

java - Spring Boot Admin 日志中重复出现 AsyncRequestTimeoutException

java - Spring Boot 在部署到 Tomcat 期间无法加载外部 jar

java - Matcher 的 appendReplacement 方法忽略替换的反斜杠

java - GWT 项目中带有 google Chart API 的虚线