目前,我们正在将批处理作业从 java 迁移到 Spring Batch。该批处理作业从数据库和 Web 服务获取输入。我们需要在四台服务器上运行此作业以提高性能,因为此作业正在处理大量数据。
上述场景可以通过Spring Batch中的远程分区来实现吗?
最佳答案
我想分享远程分区示例。您可以找到所有来源here
主应用程序:
批量配置:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class MasterConfiguration {
private static final int GRID_SIZE = 3;
private final JobBuilderFactory jobBuilderFactory;
private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;
public MasterConfiguration(JobBuilderFactory jobBuilderFactory,
RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.masterStepBuilderFactory = masterStepBuilderFactory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the master step
*/
@Bean
public Step masterStep() {
return this.masterStepBuilderFactory.get("masterStep")
.partitioner("workerStep", new BasicPartitioner())
.gridSize(GRID_SIZE)
.outputChannel(requests())
.inputChannel(replies())
.build();
}
@Bean
public Job remotePartitioningJob() {
return this.jobBuilderFactory.get("remotePartitioningJobMy")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
}
分区器:
public class BasicPartitioner extends SimplePartitioner {
private static final String PARTITION_KEY = "partition";
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = super.partition(gridSize);
int i = 0;
for (ExecutionContext context : partitions.values()) {
context.put(PARTITION_KEY, PARTITION_KEY + (i++));
}
return partitions;
}
}
经纪商配置:
@Configuration
@PropertySource("classpath:remote-partitioning.properties")
public class BrokerConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(this.brokerUrl);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
}
启动器:
@EnableBatchProcessing
@SpringBootApplication
@Import({BasicPartitioner.class, BrokerConfiguration.class})
public class MasterApplication {
@Value("${broker.url}")
private String brokerUrl;
public static void main(String[] args) {
SpringApplication.run(MasterApplication.class, args);
}
@PostConstruct
public void init() throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector(brokerUrl);
broker.start();
}
}
从应用程序:
配置:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class WorkerConfiguration {
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
public WorkerConfiguration(RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the worker step
*/
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.tasklet(tasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
return (contribution, chunkContext) -> {
System.out.println("processing " + partition);
return RepeatStatus.FINISHED;
};
}
}
入门:
@EnableBatchProcessing
@SpringBootApplication
@Import({BrokerConfiguration.class})
public class WorkerApplication {
public static void main(String[] args) {
SpringApplication.run(WorkerApplication.class, args);
}
}
关于java - Spring Batch - 远程分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21154381/