我正在使用 Spring Batch 使用 RepositoryItemReader 从 postgresql DB 读取记录,然后将其写入主题。 我看到大约有 100 万条记录需要处理,但它没有处理所有记录。 我已将阅读器的 pageSize 设置为 10,000 并与提交间隔( block 大小)相同
@Bean
public TaskletStep broadcastProductsStep(){
return stepBuilderFactory.get("broadcastProducts")
.<Product, Product> chunk(10000)
.reader(productsReader.repositoryItemReader())
.processor(productsProcessor)
.writer(compositeItemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100000)
.processorNonTransactional()
.listener(new SkipListenerProducts())
.listener(productsChunkListener)
.build();
}
@Bean
public RepositoryItemReader repositoryItemReader() {
RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();
try {
repositoryReader.setRepository(skuRepository);
repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
repositoryReader.setPageSize(10000);
repositoryReader.setSaveState(false);
List<List<String>> arguments = new ArrayList<>();
arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
SkuStatus.DISCONTINUED.getValue().toString())
.collect(Collectors.toList()));
repositoryReader.setArguments(arguments);
Map sorts = new HashMap();
sorts.put("catalog_number", Sort.Direction.ASC);
repositoryReader.setSort(sorts);
repositoryReader.afterPropertiesSet();
} catch (Exception exception){
exception.printStackTrace();
}
return repositoryReader;
}
@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode,
Pageable pageable);
最佳答案
问题可能是您根据读者查询的条件混合分页和更新 (IS_UPDATED)。
页面大小 = 2 且数据库中有 6 行的示例
- A IS_UPDATED=true
- B IS_UPDATED=true
- C IS_UPDATED=true
- D IS_UPDATED=true
- E IS_UPDATED=true
- F IS_UPDATED=true
首次读取页面 = 1 返回行 A 和 B
编写器执行后(将 A 和 B 的 IS_UPDATED 设置为 false),我们在数据库中有:
- C IS_UPDATED=true
- D IS_UPDATED=true
- E IS_UPDATED=true
- F IS_UPDATED=true
第二次阅读将移至第 2 页,因此将采用 E 和 F 行而不是 C 和 D
要么:
- 您不应更新 IS_UPDATED 列。
- 或者您创建
RepositoryItemReader
的子类并在其中重写 getPage
@Override
public int getPage() {
return 0;
}
选项 2 更能适应批量崩溃/错误,但您必须确保编写器中的 IS_UPDATED 始终设置为 false,否则读取器将无限循环。
如果您使用多线程步骤,选项 2 还需要更多调整。
关于 Spring 批处理不处理所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58409247/