Spring 批处理不处理所有记录

标签 spring spring-boot spring-batch batch-processing spring-batch-tasklet

我正在使用 Spring Batch 使用 RepositoryItemReader 从 postgresql DB 读取记录,然后将其写入主题。 我看到大约有 100 万条记录需要处理,但它没有处理所有记录。 我已将阅读器的 pageSize 设置为 10,000 并与提交间隔( block 大小)相同

@Bean
public TaskletStep broadcastProductsStep(){
    return stepBuilderFactory.get("broadcastProducts")
            .<Product, Product> chunk(10000)
            .reader(productsReader.repositoryItemReader())
            .processor(productsProcessor)
            .writer(compositeItemWriter)                    
            .faultTolerant()
            .skip(Exception.class)                              
            .skipLimit(100000)
            .processorNonTransactional()                        
            .listener(new SkipListenerProducts())               
            .listener(productsChunkListener)
            .build();
}


@Bean
public RepositoryItemReader repositoryItemReader() {

    RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();

    try {
        repositoryReader.setRepository(skuRepository);
        repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
        repositoryReader.setPageSize(10000);
        repositoryReader.setSaveState(false);

        List<List<String>> arguments = new ArrayList<>();
        arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
                SkuStatus.DISCONTINUED.getValue().toString())
                  .collect(Collectors.toList()));
        repositoryReader.setArguments(arguments);

        Map sorts = new HashMap();
        sorts.put("catalog_number", Sort.Direction.ASC);

        repositoryReader.setSort(sorts);
        repositoryReader.afterPropertiesSet();

    } catch (Exception exception){
        exception.printStackTrace();
    }

    return repositoryReader;
}

@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode, 
        Pageable pageable);

最佳答案

问题可能是您根据读者查询的条件混合分页和更新 (IS_UPDATED)。

页面大小 = 2 且数据库中有 6 行的示例

  • A IS_UPDATED=true
  • B IS_UPDATED=true
  • C IS_UPDATED=true
  • D IS_UPDATED=true
  • E IS_UPDATED=true
  • F IS_UPDATED=true

首次读取页面 = 1 返回行 A 和 B

编写器执行后(将 A 和 B 的 IS_UPDATED 设置为 false),我们在数据库中有:

  • C IS_UPDATED=true
  • D IS_UPDATED=true
  • E IS_UPDATED=true
  • F IS_UPDATED=true

第二次阅读将移至第 2 页,因此将采用 E 和 F 行而不是 C 和 D

要么:

  1. 您不应更新 IS_UPDATED 列。
  2. 或者您创建 RepositoryItemReader 的子类并在其中重写 getPage
    @Override
    public int getPage() {
        return 0;
    }

选项 2 更能适应批量崩溃/错误,但您必须确保编写器中的 IS_UPDATED 始终设置为 false,否则读取器将无限循环。

如果您使用多线程步骤,选项 2 还需要更多调整。

关于 Spring 批处理不处理所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58409247/

相关文章:

java - 我需要 XA 交易吗? DefaultMessageListenerContainer 本地事务与 XA 连接工厂

java - Spring批处理并行处理两个任务,但第二个任务依赖于第一个任务

mysql - Spring Batch JpaPagingItemReader(可能)在使用 SELECT native 查询时从数据库中删除行

java - 使用 spring-data-jpa 自定义 ItemReader

java - 当上下文关闭时,已经创建的 bean 会发生什么

java - Spring & Maven & JUnit - BeanCreationException : Could not autowire field NoSuchBeanDefinitionException: No qualifying bean of type

java - 在 Spring Cloud 应用程序中实现重试

java - @SpringBootTest 用于非 spring-boot 应用程序

mysql - 在 Spring Boot 中运行测试时启动临时 mysql 容器

java - 使用 spring-boot 和 Weblogic 公开 SOAP Webservice