我很难找到一种 Spring 方法来初始化将传入消息发送到多个队列的交换 - 在我的 Spring-boot 应用程序上:
我找不到定义秒交换队列绑定(bind)的好方法。
我使用 RabbitTemplate 作为生产者客户端。
RabbitMQ 6 页教程并没有真正帮助,因为:
- 消费者按需提供的唯一几个初始临时队列(而我需要生产者进行绑定(bind) - 到持久队列)
- 这些示例用于基本的 java 用法 - 不使用 Spring 功能。
我也没有找到如何通过 spring AMQP 页面实现它。
到目前为止我得到的是尝试将基本的 java 绑定(bind)注入(inject) spring 的方式 - 但它不起作用......
@Bean
public ConnectionFactory connectionFactory() throws IOException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection conn = connectionFactory.createConnection();
Channel channel = conn.createChannel(false);
channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind
return connectionFactory;
}
任何帮助将不胜感激
已编辑
我认为问题出现的事实是,每次我重新启动服务器时,它都会尝试重新定义交换查询绑定(bind)——而它们仍然存在于代理中…… 我设法通过代理 UI 控制台手动定义它们 - 因此生产者只知道交换名称,而消费者只知道它的相关队列。 有没有一种方法可以以编程方式定义这些元素 - 但如果在以前的重启中已经存在,那么它不会被重新定义\覆盖?
最佳答案
我们使用类似于下面的方法将数据从一个特定的输入 channel 发送到其他消费者的多个输入队列:
@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
IntegrationFlows
.from("some-input-channel")
.handle(Amqp.outboundAdapter(rabbitTemplate)
.headerMapper(headerMapper))
.get()
}
@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
headerMapper.setRequestHeaderNames("*");
return headerMapper;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory();
}
@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
for (final String queueName : MY_FANOUT.getQueueNames) {
final Queue queue = new Queue(queueName, true);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
}
rabbitTemplate.setExchange(fanoutExchange);
}
为了完整起见,这里是扇出声明的枚举:
public enum MyFanout {
MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),
private final List<String> queueNames;
private final String fanoutName;
MyFanout(final List<String> queueNames, final String fanoutName) {
this.queueNames = requireNonNull(queueNames, "queue must not be null!");
this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
}
public List<String> getQueueNames() {
return this.queueNames;
}
public String getFanoutName() {
return this.fanoutName;
}
}
希望对您有所帮助!
关于java - RabbitMQ - Java Spring - 如何初始化交换到多个队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37906043/