java - Spring Batch - JpaPagingItemReader - 在 MySQL 中工作 - 在 PostgreSQL 中重复

标签 java mysql postgresql jpa spring-batch

在 Spring Batch 作业运行之前。我有一个导入表,其中包含需要导入到我们系统中的所有项目。此时已验证它仅包含我们系统中不存在的项目。

接下来我有一个 Spring Batch 作业,它使用 JpaPagingItemReader 从此导入表中读取数据。 工作完成后,它使用 ItemWriter 写入数据库。

我以 10000 的页面大小和 block 大小运行。 现在,当在 MySQL innoDB 上运行时,这绝对可以正常工作。我什至可以使用多线程,一切正常。

但是现在我们正在迁移到 PostgreSQL,同样的批处理作业遇到了一些非常奇怪的问题 发生的情况是它尝试将重复项插入我们的系统。这自然会被唯一索引约束拒绝并抛出错误。 由于导入数据库表在批处理作业开始之前被验证为仅包含不存在的内容,因此我能想到的唯一原因是,当我在 Postgres 上运行时,JpaPagingItemReader 从导入数据库表中多次读取某些行。但为什么要这么做呢?

我尝试了很多设置。将 block 和页面大小减小到 100 左右只会使导入速度变慢,但仍然会出现同样的错误。运行单线程而不是多线程只会使错误稍后发生。 那么我的 JpaPagingItemReader 仅在 PostgresSQL 上多次读取相同项目的原因到底是什么? 支持阅读器的 select 语句很简单,它是一个 NamedQuery:

@NamedQuery(name = "ImportDTO.findAllForInsert",
            query = "select h from ImportDTO h where h.toBeImported = true")

另请注意,批处理作业在运行时根本不会更改 toBeImported 标志,因此此查询的结果应始终在批处理作业之前、期间和之后返回相同的结果。

非常感谢任何见解、提示或帮助!

这里是批量配置代码:

import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private OrganizationItemWriter organizationItemWriter;
    @Autowired
    private EntityManagerFactory entityManagerFactory;
    @Autowired
    private OrganizationUpdateProcessor organizationUpdateProcessor;
    @Autowired
    private OrganizationInsertProcessor organizationInsertProcessor;

    private Integer organizationBatchSize = 10000;
    private Integer organizationThreadSize = 3;
    private Integer maxThreadSize = organizationThreadSize;

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public OrganizationItemWriter writer() throws Exception {
        return organizationItemWriter;
    }

    @Bean
    public StepExecutionNotificationListener stepExecutionListener() {
        return new StepExecutionNotificationListener();
    }

    @Bean
    public ChunkExecutionListener chunkListener() {
        return new ChunkExecutionListener();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(maxThreadSize);
        return taskExecutor;
    }

    @Bean
    public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
        return jobBuilderFactory.get("importAndUpdateOrganizationJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .start(importNewOrganizationsFromImports())
                .next(updateOrganizationsFromImports())
                .build();
    }

    @Bean
    public Step importNewOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("importNewOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findNewImportsToImport())
                .processor(organizationInsertProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }


    @Bean
    public Step updateOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("updateOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findImportsToUpdate())
                .processor(organizationUpdateProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }
}

最佳答案

您需要添加 order by 子句来选择

关于java - Spring Batch - JpaPagingItemReader - 在 MySQL 中工作 - 在 PostgreSQL 中重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48684224/

相关文章:

java - AndroidPlot 和 RectRegion

Java - 以编程方式剪切字符串

java - 一段代码的后置条件

mysql - 在mysql存储过程中将varchar转换为tinyint(4)

sql - 计算不包括周末和节假日的日期之间的天数的初学者方法

postgresql - 如何显示 PostgreSQL 中某行的旧版本?

java - "default character encoding"和 "native character encoding"的含义是什么?

php - 来自 phpmyadmin 数据库的 Google 图表

mysql - 如何安排 MySQL 查询?

c++ - PostgreSQL C API (libpq) 是否允许您使用结果而不是存储结果?