java - 如何跳过 block 中的任何错误并继续下一个 block 项目?

标签 java spring-boot exception spring-batch

我创建了读取器、处理器和写入器。 我将 block 的大小定义为 5。 我在处理器中对每一项都有一个操作。 我在 Writer 中有两笔交易。更新所有 5 件商品的数据库,并在另一个地方确认所有 5 件商品的交易。 我的项目不相互依赖,因此如果其中一个项目失败,其他项目并不关心,它们希望继续进行。

用例 1:

如果处理器因任何类型的异常(RESTful 异常、任何 java 异常、DB 异常、运行时异常)而失败,比如说第二项,我想继续第三项、第四项和第五项。 如果第四项失败,我想继续第五项。 因此,根据我的理解,通过跳过,当处理器中包含失败项目的这个 block 失败时,我可以重复这个 block ,但没有第二个和第四个项目(失败),对吗? 如果 Writer 运行顺利,两个事务都会在 chunk 和 jog 开始下一个 chunk 并包含接下来的 5 个项目之后提交,对吗?

用例 2:

无论 block 是新的还是重复的,用例 1 没有这 2 个项目,如果在 Writer 中第二个事务失败,我想回滚第一个事务,而不需要手动执行回滚和提交。 因此,如果 Write 抛出异常,它将自动回滚第一个事务。 这很好。 但我想要的是,即使出现异常和事务回滚(对于该 block ),我也想以相同的方式、相同的行为继续处理下一个 block ,依此类推到最后一个 block 。

为了实现用例 1,我想我必须将步骤配置为:

@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final MyItemReader myItemReader;
private final MyItemProcessor myItemProcessor;
private final MyItemWriter myItemWriter;

private final SimpleJobExecutionListener simpleJobExecutionListener;
private final MyChunkListener myChunkListener;

private final ApplicationContext applicationContext;
private final DataSource dataSource;





public BatchConfiguration(
        JobBuilderFactory jobBuilderFactory,
        StepBuilderFactory stepBuilderFactory,
        MyItemReader myItemReader,
        MyItemProcessor myItemProcessor,
        MyItemtWriter myItemWriter,
        SimpleJobExecutionListener simpleJobExecutionListener,
        MyChunkListener myChunkTransactionListener,
        DataSource dataSource,
        ApplicationContext applicationContext) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
    this.myItemReader = myItemReader;
    this.myItemProcessor = myItemProcessor;
    this.myItemWriter = myItemWriter;
    this.simpleJobExecutionListener = simpleJobExecutionListener;
    this.myChunkListener = myChunkListener;
    this.dataSource = dataSource;
    this.applicationContext = applicationContext;
}

@Bean
public Job registrationChunkJob() {
    return jobBuilderFactory.get("MyJob")
            .incrementer(new RunIdIncrementer())
            .listener(simpleJobExecutionListener)
            .flow(step()).end().build();
}

@Bean
TaskExecutor taskExecutorStepPush() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(2);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(4);
    taskExecutor.setAllowCoreThreadTimeOut(true);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
    return taskExecutor;
}

@Bean
public Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

    return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
            .reader(myItemReader)
            .processor(myItemProcessor)
            .faultTolerant()
            .writer(myItemWriter)
            .listener(myChunkListener)
            .taskExecutor(taskExecutorStepPush())
            .throttleLimit(5)
            .transactionAttribute(attribute)
            .build();
}

我的工作没有安排。当当前工作完成时,无论成功与否,我都会手动开始下一个工作。 正如我所说,我不会从 Writer 更改 DB 中的标志,因此如果失败并且某些数据被跳过并且未在 DB(Writer)中更新,当作业完成时,1 小时后它将启动新作业并尝试相同的操作(可能是新的)来自数据库的项目(读者将选择它们,因为标记不会在处理时更新)。

但不知何故,这不起作用,而且已经晚了,我不明白为什么。 它在 block 中需要 5 个项目,并且在处理器中没有失败,但在尝试提交 2 个事务时在写入器中失败(第二个失败)。它重复 block ,但仅使用一个项目,使用第一个项目,并尝试两次(使用一个项目,第一个项目),然后将作业标记为失败并停止。这是我不想要的。数据库中有很多项目可供选择,其中可能是不错的。

如果 Writer 失败,我不想重复相同的 block 。我只想在处理器中失败时重复 block (仅获得好的 block )。 另外,如果 block 失败,我不希望作业停止,我希望作业继续处理下一个 block ,依此类推...... 如何实现这一目标?

最佳答案

How to skip any error in chunk and to continue with next items?

为此,您需要配置哪些异常应导致跳过该项目,如 Configuring Skip Logic 中所述。部分。

根据您的配置,您没有指定任何可跳过的异常。您的步骤定义应该类似于:

@Bean
public Step step() {
   DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
   attribute.setPropagationBehavior(Propagation.REQUIRED.value());
   attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

   return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
        .reader(myItemReader)
        .processor(myItemProcessor)
        .faultTolerant()
        // add skip configuration
        .skipLimit(10)
        .skip(MySkippableException.class)
        .writer(myItemWriter)
        .listener(myChunkListener)
        .taskExecutor(taskExecutorStepPush())
        .throttleLimit(5)
        .transactionAttribute(attribute)
        .build();
}

关于java - 如何跳过 block 中的任何错误并继续下一个 block 项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60083276/

相关文章:

java - Junit5测试无法注入(inject)依赖项,但正常运行应用程序可以注入(inject)它

c++ - 捕获命名空间限定的异常

c# - 异常处理循环拼图

java - HQL - 通过电子邮件或带点 ('.' 的字符串查询始终返回空集

spring-boot - 使用 JpaRepository 将 protobuf 直接保存为实体

java - Camel 失败的核心测试用例

c# - 使用 Nunit 测试异常

java - 如何在JPA中实现复杂的实体关系

java - Cobertura 2.0.3 和 Sonar 未显示代码覆盖率

java - Cursor.hasNext 抛出 java.util.NoSuchElementException