我在 Spring Batch 中创建异步处理器时遇到问题。
我的处理器正在从 reader
获取 ID
并根据来自 SOAP
调用的响应创建对象。有时对于 1 个输入 (ID
) 必须有例如60-100 个 SOAP
调用,有时只有 1 个。我尝试进行多线程步骤,它一次处理例如 50 个输入,但它没有用,因为 49 个线程在 1 秒内完成工作并被阻塞,等待这个正在执行 60-100 个 SOAP
调用。现在我使用 AsyncItemProcessor
+AsyncItemWriter
但这个解决方案对我来说工作缓慢。由于我的输入 (IDs
) 很大,从 DB 读取的大约 25k 个项目我想一次开始约 50-100 个输入。
这是我的配置:
@Configuration
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DatabaseConfig databaseConfig;
@Value(value = "classpath:Categories.txt")
private Resource categories;
@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
}
@Bean
public Step orderStep1() throws Exception {
return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
@Bean
public ItemWriter asyncItemWriter() {
AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public ItemWriter<CategoryDailyResult> itemWriter(){
return new Writer();
}
@Bean
public ItemProcessor asyncItemProcessor() {
AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
return new Processor();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(50);
return taskExecutor;
}
@Bean(destroyMethod = "")
public ItemReader<Category> reader() throws Exception {
String query = "select c from Category c where not exists elements(c.children)";
JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
reader.setSaveState(false);
reader.setQueryString(query);
reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
reader.setPageSize(1);
return reader;
}
}
如何提升我的应用程序?也许我做错了什么?欢迎任何反馈;)
@编辑: 对于 ID 的输入:1 到 100 我想要例如 50 个正在执行处理器的线程。我希望他们不要互相阻止: Thread1 处理输入“1” 2 分钟,此时我希望 Thread2 处理输入“2”、“8”、“64”,这些输入很小,几秒钟内执行。
@Edit2:
我的目标:
我在数据库中有 25k 个 ID,我使用 JpaPagingItemReader
读取它们,并且每个 ID 都由处理器处理。每个项目都是相互独立的。对于每个 ID,我让 SOAP
在循环中调用 0-100 次,然后创建对象,将其传递给 Writer
并保存在数据库中。我如何才能获得此类任务的最佳性能?
最佳答案
你应该划分你的工作。像这样添加一个分区步骤:
@Bean
public Step partitionedOrderStep1(Step orderStep1) {
return stepBuilder.get("partitionedOrderStep1")
.partitioner(orderStep1)
.partitioner("orderStep1", new SimplePartitioner())
.taskExecutor(taskExecutor())
.gridSize(10) //Number of concurrent partitions
.build();
}
然后在您的作业定义中使用该步骤。 .gridSize() 调用配置要同时执行的分区数。如果您的任何 Reader、Processor 或 Writer 对象是有状态的,您需要使用 @StepScope 对其进行注释。
关于java - Spring Batch 异步处理器配置以获得最佳性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45759058/