在批处理作业步骤配置中,我计划在 writer 中执行 2 次查询,第 1 次查询是更新表 A 中的记录,然后第 2 次查询是再次在表 A 中插入新记录。
到目前为止,我认为 CompositeItemWriter 可以实现我上面的目标,即我需要创建 2 个 JdbcBatchItemWriters,一个用于更新,另一个用于插入。
我的第一个问题是 CompositeItemWriter 是否适合上述要求?
如果是,那就引出关于交易的第二个问题。例如,如果第一次更新成功,第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动提取两个更新?
提前致谢!
最佳答案
My first question is if CompositeItemWriter is a fit for the requirement above?
是的,
CompositeItemWriter
是要走的路。If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?
好问题!是的,如果在第一个写入器中更新成功,然后在第二个写入器中插入失败,则所有语句将自动回滚。您需要知道的是事务围绕面向 block 的 tasklet 步骤的执行(因此围绕复合项目编写器的
write
方法)。因此,此方法中所有 sql 语句的执行(在委托(delegate)编写器中执行)将是原子的。为了说明这个用例,我编写了以下测试:
people
有两列 id
和 name
里面只有一条记录:1,'foo'
1,'foo'
, 2,'bar'
)并尝试更新 foo
至foo!!
然后插入 2,'bar'
在表中。这是通过 CompositeItemWriter
完成的。有两个项目作者:UpdateItemWriter
和 InsertItemWriter
UpdateItemWriter
成功但 InsertItemWriter
失败(抛出异常)foo
未更新为 foo!!
和 bar
未插入表中(由于InsertItemWriter
中的异常,两条sql语句都回滚了)这是代码(它是独立的,因此您可以尝试一下,看看它是如何工作的,它使用了一个应该在您的类路径中的嵌入式 hsqldb 数据库):
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
}
@Test
public void testTransactionRollbackWithCompositeWriter() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount);
Assert.assertEquals(1, fooCount);
Assert.assertEquals(0, barCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(0, stepExecution.getCommitCount());
Assert.assertEquals(1, stepExecution.getRollbackCount());
Assert.assertEquals(0, stepExecution.getWriteCount());
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount); // bar is not inserted
Assert.assertEquals(0, barCount); // bar is not inserted
Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
return new ListItemReader<>(Arrays.asList(foo, bar));
}
@Bean
public ItemWriter<Person> updateItemWriter() {
return new UpdateItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> insertItemWriter() {
return new InsertItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> itemWriter() {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
return compositeItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory
.get("step").<Person, Person>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class UpdateItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public UpdateItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("foo".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
}
}
}
}
public static class InsertItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public InsertItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
throw new IllegalStateException("Something went wrong!");
}
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我的示例使用自定义项目编写器,但这应该适用于两个
JdbcBatchItemWriter
s 也是。我希望这有帮助!
关于spring-batch - Spring Batch CompositeItemWriter 如何为委托(delegate)编写者管理事务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51904498/