在 Spring Batch 作业运行之前。我有一个导入表,其中包含需要导入到我们系统中的所有项目。此时已验证它仅包含我们系统中不存在的项目。
接下来我有一个 Spring Batch 作业,它使用 JpaPagingItemReader 从此导入表中读取数据。 工作完成后,它使用 ItemWriter 写入数据库。
我以 10000 的页面大小和 block 大小运行。 现在,当在 MySQL innoDB 上运行时,这绝对可以正常工作。我什至可以使用多线程,一切正常。
但是现在我们正在迁移到 PostgreSQL,同样的批处理作业遇到了一些非常奇怪的问题 发生的情况是它尝试将重复项插入我们的系统。这自然会被唯一索引约束拒绝并抛出错误。 由于导入数据库表在批处理作业开始之前被验证为仅包含不存在的内容,因此我能想到的唯一原因是,当我在 Postgres 上运行时,JpaPagingItemReader 从导入数据库表中多次读取某些行。但为什么要这么做呢?
我尝试了很多设置。将 block 和页面大小减小到 100 左右只会使导入速度变慢,但仍然会出现同样的错误。运行单线程而不是多线程只会使错误稍后发生。 那么我的 JpaPagingItemReader 仅在 PostgresSQL 上多次读取相同项目的原因到底是什么? 支持阅读器的 select 语句很简单,它是一个 NamedQuery:
@NamedQuery(name = "ImportDTO.findAllForInsert",
query = "select h from ImportDTO h where h.toBeImported = true")
另请注意,批处理作业在运行时根本不会更改 toBeImported 标志,因此此查询的结果应始终在批处理作业之前、期间和之后返回相同的结果。
非常感谢任何见解、提示或帮助!
这里是批量配置代码:
import javax.persistence.EntityManagerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private OrganizationItemWriter organizationItemWriter;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Autowired
private OrganizationUpdateProcessor organizationUpdateProcessor;
@Autowired
private OrganizationInsertProcessor organizationInsertProcessor;
private Integer organizationBatchSize = 10000;
private Integer organizationThreadSize = 3;
private Integer maxThreadSize = organizationThreadSize;
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
@Bean
public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public OrganizationItemWriter writer() throws Exception {
return organizationItemWriter;
}
@Bean
public StepExecutionNotificationListener stepExecutionListener() {
return new StepExecutionNotificationListener();
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(maxThreadSize);
return taskExecutor;
}
@Bean
public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importAndUpdateOrganizationJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(importNewOrganizationsFromImports())
.next(updateOrganizationsFromImports())
.build();
}
@Bean
public Step importNewOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("importNewOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findNewImportsToImport())
.processor(organizationInsertProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
@Bean
public Step updateOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("updateOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findImportsToUpdate())
.processor(organizationUpdateProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
}
最佳答案
您需要添加 order by 子句来选择
关于java - Spring Batch - JpaPagingItemReader - 在 MySQL 中工作 - 在 PostgreSQL 中重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48684224/