spring-batch - Spring Batch CompositeItemWriter 如何为委托(delegate)编写者管理事务?

标签 spring-batch compositeitemwriter

在批处理作业步骤配置中,我计划在 writer 中执行 2 次查询,第 1 次查询是更新表 A 中的记录,然后第 2 次查询是再次在表 A 中插入新记录。

到目前为止,我认为 CompositeItemWriter 可以实现我上面的目标,即我需要创建 2 个 JdbcBatchItemWriters,一个用于更新,另一个用于插入。

我的第一个问题是 CompositeItemWriter 是否适合上述要求?

如果是,那就引出关于交易的第二个问题。例如,如果第一次更新成功,第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动提取两个更新?

提前致谢!

最佳答案

My first question is if CompositeItemWriter is a fit for the requirement above?



是的,CompositeItemWriter是要走的路。

If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?



好问题!是的,如果在第一个写入器中更新成功,然后在第二个写入器中插入失败,则所有语句将自动回滚。您需要知道的是事务围绕面向 block 的 tasklet 步骤的执行(因此围绕复合项目编写器的 write 方法)。因此,此方法中所有 sql 语句的执行(在委托(delegate)编写器中执行)将是原子的。

为了说明这个用例,我编写了以下测试:
  • 给定一张表people有两列 idname里面只有一条记录:1,'foo'
  • 让我们假设一个作业读取两条记录( 1,'foo'2,'bar' )并尝试更新 foofoo!!然后插入 2,'bar'在表中。这是通过 CompositeItemWriter 完成的。有两个项目作者:UpdateItemWriterInsertItemWriter
  • 用例是 UpdateItemWriter成功但 InsertItemWriter失败(抛出异常)
  • 预期结果是 foo未更新为 foo!!bar未插入表中(由于InsertItemWriter中的异常,两条sql语句都回滚了)

  • 这是代码(它是独立的,因此您可以尝试一下,看看它是如何工作的,它使用了一个应该在您的类路径中的嵌入式 hsqldb 数据库):
    import java.util.Arrays;
    import java.util.List;
    import javax.sql.DataSource;
    
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.CompositeItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.test.JobLauncherTestUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.jdbc.JdbcTestUtils;
    
    @RunWith(SpringRunner.class)
    @ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
    public class TransactionWithCompositeWriterTest {
    
        @Autowired
        private JobLauncherTestUtils jobLauncherTestUtils;
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        @Before
        public void setUp() {
            jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
            jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
        }
    
        @Test
        public void testTransactionRollbackWithCompositeWriter() throws Exception {
            // given
            int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
            int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
            int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
            Assert.assertEquals(1, peopleCount);
            Assert.assertEquals(1, fooCount);
            Assert.assertEquals(0, barCount);
    
            // when
            JobExecution jobExecution = jobLauncherTestUtils.launchJob();
    
            // then
            Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
            Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
            StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
            Assert.assertEquals(0, stepExecution.getCommitCount());
            Assert.assertEquals(1, stepExecution.getRollbackCount());
            Assert.assertEquals(0, stepExecution.getWriteCount());
    
            peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
            fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
            barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
            Assert.assertEquals(1, peopleCount); // bar is not inserted
            Assert.assertEquals(0, barCount); // bar is not inserted
            Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
        }
    
        @Configuration
        @EnableBatchProcessing
        public static class JobConfiguration {
    
            @Bean
            public DataSource dataSource() {
                return new EmbeddedDatabaseBuilder()
                        .setType(EmbeddedDatabaseType.HSQL)
                        .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                        .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                        .build();
            }
    
            @Bean
            public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                return new JdbcTemplate(dataSource);
            }
    
            @Bean
            public ItemReader<Person> itemReader() {
                Person foo = new Person(1, "foo");
                Person bar = new Person(2, "bar");
                return new ListItemReader<>(Arrays.asList(foo, bar));
            }
    
            @Bean
            public ItemWriter<Person> updateItemWriter() {
                return new UpdateItemWriter(dataSource());
            }
    
            @Bean
            public ItemWriter<Person> insertItemWriter() {
                return new InsertItemWriter(dataSource());
            }
    
            @Bean
            public ItemWriter<Person> itemWriter() {
                CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
                compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
                return compositeItemWriter;
            }
    
            @Bean
            public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
                return jobBuilderFactory.get("job")
                        .start(stepBuilderFactory
                                .get("step").<Person, Person>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build())
                        .build();
            }
    
            @Bean
            public JobLauncherTestUtils jobLauncherTestUtils() {
                return new JobLauncherTestUtils();
            }
        }
    
        public static class UpdateItemWriter implements ItemWriter<Person> {
    
            private JdbcTemplate jdbcTemplate;
    
            public UpdateItemWriter(DataSource dataSource) {
                this.jdbcTemplate = new JdbcTemplate(dataSource);
            }
    
            @Override
            public void write(List<? extends Person> items) {
                for (Person person : items) {
                    if ("foo".equalsIgnoreCase(person.getName())) {
                        jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                    }
                }
            }
        }
    
        public static class InsertItemWriter implements ItemWriter<Person> {
    
            private JdbcTemplate jdbcTemplate;
    
            public InsertItemWriter(DataSource dataSource) {
                this.jdbcTemplate = new JdbcTemplate(dataSource);
            }
    
            @Override
            public void write(List<? extends Person> items) {
                for (Person person : items) {
                    if ("bar".equalsIgnoreCase(person.getName())) {
                        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                        throw new IllegalStateException("Something went wrong!");
                    }
                }
            }
        }
    
        public static class Person {
    
            private long id;
    
            private String name;
    
            public Person() {
            }
    
            public Person(long id, String name) {
                this.id = id;
                this.name = name;
            }
    
            public long getId() {
                return id;
            }
    
            public void setId(long id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
        }
    }
    

    我的示例使用自定义项目编写器,但这应该适用于两个 JdbcBatchItemWriter s 也是。

    我希望这有帮助!

    关于spring-batch - Spring Batch CompositeItemWriter 如何为委托(delegate)编写者管理事务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51904498/

    相关文章:

    java - 如何将数据放入编写器内部的工作范围Map?

    spring - Spring Batch-读取和处理多少条记录

    java - Spring Batch FlatFileItemReader 继续错误数量的 token

    Spring Batch 并行处理 - 运行时的文件拆分

    java - 无法使用 CompositeItemWriter : ItemWriter is not a ItemStream

    java - Spring 批处理 : How to create a Composite Item Writer?

    java - 优雅地关闭 spring 批处理并在失败的情况下恢复批处理