java - 适用于 PostgreSQL 的 Spring Batch StoredProcedureItemReader - java.lang.IllegalStateException

标签 java spring spring-boot spring-batch

我收到非法状态异常。我尝试通过设置以下参数来解决它

etlReader.setSaveState(false);
etlReader.setUseSharedExtendedConnection(false);

有人知道发生了什么事吗?

我在application.properties中的配置如下:

spring.datasource.url=jdbc:postgresql://localhost:5432/

spring.datasource.username=user

spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect

spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true

spring.jpa.show-sql=true spring.batch.initialize-schema=always

@Scheduled param

etl.scheduler.frequency=3600000

这是ItemReader实现的重要部分。如果需要任何其他信息,请告诉我。

@SuppressWarnings("rawtypes")
@Bean
public StoredProcedureItemReader generateEvents(DataSource dataSource) {

    StoredProcedureItemReader<Event> etlReader = new StoredProcedureItemReader<Event>();
    etlReader.setDataSource(dataSource);
    etlReader.setProcedureName("myStoredProcedure");
    etlReader.setFunction(false);
    etlReader.setRowMapper(etlNoOpRowMapper());     
    return etlReader;
}

@Bean
    public NoOpEventETLWriter etljobWriter() {
        return new NoOpMyDataETLWriter();
    }

    @Bean(name = "executeETLStep")
    public Step executeETLStep(ItemReader<Event> generateEvents) {
        
        return stepBuilderFactory.get("executeETLStep").<MyData, MyData>chunk(1).reader(generateEvents)
                .writer(etljobWriter()).build();
    }

    @Bean(name = "etlJob")
    public Job etlJob(CardinalBatchCompletionListener listener, @Qualifier("executeETLStep") Step executeETLStep) {
        return jobBuilderFactory.get("etlJob").incrementer(new RunIdIncrementer()).listener(listener)
                .flow(executeETLStep).end().build();
    }

堆栈跟踪:

org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
  at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
  at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
  at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at com.sun.proxy.$Proxy73.run(Unknown Source) ~[na:na] at com.domain.name.MyJobScheduler.runBatch(JobScheduler.java:40) ~[classes!/:0.0.1-SNAPSHOT]
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
  at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
  at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na] at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
  at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Caused by: java.lang.IllegalStateException: Stream is already initialized. Close before re-opening.
  at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
  at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:424) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:149) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
  ... 37 common frames omitted

编辑

我的所有 bean 都在配置文件中,因此一旦我启动它们就会被初始化。并使用调度程序运行作业。为什么作业执行再次尝试重新初始化资源?

最佳答案

如果您的调度程序同时运行两个作业实例,则读取器将被共享,这就是导致问题的原因:一个作业打开读取器,然后在关闭它之前,另一个作业也尝试打开它。

您需要确定读者的工作范围(步骤范围也可以)。

关于java - 适用于 PostgreSQL 的 Spring Batch StoredProcedureItemReader - java.lang.IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55754124/

相关文章:

java - Sprint启动kafka Consumer无法连接到kafka容器

java - 在包括开始日期在内的两个日期之间迭代?

java - 避免 OLTP 上的 Spring Batch 作业

java - 通过 Spring Boot 和 Vaadin 传播代码更​​改,无需在 Eclipse 中重新启动应用程序

java - Java 中的泛型/接口(interface)和树结构

java - Spring 事务不回滚

javascript - 显示使用 Spring、Spring Security、Hibernate、jQuery、Backbone、AJAX 的 web-app 的当前登录用户的名称

java - 我们应该在 Spring Boot 中为存储库层编写单元测试和集成测试吗?

java - 如果条件基于 Activeprofile

java - 使用 Java/Jackcess 从加密的 Access .mdb 中读取