我需要将大量数据写入mysql。
我想在多个线程或性能中完成它。我想使用 spring 批处理分区,但以前从未这样做过。
我的 Spring Batch Java 配置(部分):
@Bean
ItemWriter<Event> writer() throws SQLException {
return new CustomJdbcBatchDataWriter();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(threadsAmount);
return asyncTaskExecutor;
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1())
.end()
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Event, Event>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
还有我的自定义 jdbc 编写器(必须编写它来禁用自动提交 - 以提高性能):
公共(public)类 CustomJdbcBatchDataWriter 实现 ItemWriter {
@Override
public void write(List<? extends Event> items) throws Exception {
try (
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/batch?useSSL=false&useServerPrepStmts=false&rewriteBatchedStatements=true",
"user", "password") ) {
connection.setAutoCommit(false);
String sql = "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
PreparedStatement ps = connection.prepareStatement(sql);
for (Event p : items) {
try {
ps.setString(1, p.getId());
//Setting rest of data into prepared statement
...
ps.addBatch();
} catch (SQLException e) {
e.printStackTrace();
}
}
ps.executeBatch();
connection.commit();
}
}
}
我如何配置它,以便它在不同的线程中插入不同的数据以获得性能? 非常感谢任何帮助。
最佳答案
这里有一些可以帮助您入门的东西。我还没有测试过它,但它至少应该让你接近。
//updated this bean of yours. the others are new
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(partitionStep())
.end()
.build();
}
@Bean
public Step partitionStep(){
return stepBuilderFactory.get("partitionStep")
.partitioner(step1()) //leverage the step you already have
.partitioner("step1", partitioner())
.gridSize(10) //# of threads
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner partitioner() {
//Use this partitioner to add ranges for your reader
//NOTE: your reader needs to be in @StepScope to pull from the Step Execution Context
return new YourCustomPartitioner();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
关于java - Spring Batch(Spring Boot - Java配置)多线程jdbc编写器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42935845/