从 RabbitMQ 连接规范列表到发布订阅 channel 上的一系列消费者的正确方法是什么?
也就是说,假设我有如下的 rabbit 配置:
rabbits:
- hostname: blahblah
vhost: blahlbah
username: blahlbah
password: blahlbalh
exchange: blahblah
- hostname: blahblah1
vhost: blahlbah1
username: blahlbah1
password: blahlbalh1
exchange: blahblah1
...
我将这些整理成一个 @NestedConfigurationProperty List<RabbitConfiguration>
.我可以写一个 @Bean
让我得到 AmqpTemplate
的方法来自其中一个 List<RabbitConfiguration>
,像这样:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AmqpTemplate amqpTemplate(RabbitConfiguration rabbitConfiguration) {
...
}
然后我基本上可以将其映射到 IntegrationFlow
:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public IntegrationFlow integrationFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(inboundPubSubChannel()).handle(Amqp.outboundAdapter(amqpTemplate)).get();
}
然而,对于 List<RabbitConfiguration>
执行此操作的正确方法是什么?并得到结果 List<IntegrationFlow>
由 Spring 处理?当然不仅仅是:
@Bean
public List<IntegrationFlow> integrationFlows(List<RabbitConfiguration> rabbitConfigurations) {
return rabbitConfigurations.stream()
.map(this::amqpTemplate)
.map(this::integrationFlow)
.collect(toList())
}
最佳答案
我认为您要做的是根据您的配置属性动态创建类型为 IntegrationFlow 的 Spring bean。根据您希望它变得多么“神奇”或“透明”,您有很多选择。如果你想要完整的魔法,你必须实现一个 BeanFactoryPostProcessor 并在 Spring 上下文中注册。 这样的事情应该有效:
public class IntegrationFlowPostProcessor implements BeanFactoryPostProcessor{
List<RabbitConfiguration> rabbitConfigurations;
public IntegrationFlowPostProcessor(List<RabbitConfiguration> rabbitConfigurations) {
this.rabbitConfigurations = rabbitConfigurations;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
rabbitConfigurations.stream()
.forEach(rabbitConfig -> {
IntegrationFlow intFlow = integrationFlow(amqpTemplate(rabbitConfig));
beanFactory.registerSingleton(rabbitConfig.getHostanme(), intFlow);
});
}
private AmqpTemplate amqpTemplate(RabbitConfiguration rabbitConfiguration) {
//Implement here
return null;
}
private IntegrationFlow integrationFlow(AmqpTemplate amqpTemplate) {
//Implement here
return null;
}
}
然后你必须在你的配置类中注册后处理器:
@Bean public IntegrationFlowPostProcessor ifpp(List<RabbitConfiguration> config {
return new IntegrationFlowPostProcessor(config);
}
然后您就可以使用 @Qualifier
通过主机名将每个集成流注入(inject)到其他 bean 中,或者作为一个集合,比如说 List<IntegrationFlow>
所有集成流程都存在于 Spring 上下文中。
关于java - 使用 Spring Integration 进行动态配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32509887/