java - RabbitMQ - Java Spring - 如何初始化交换到多个队列?

标签 java spring spring-boot rabbitmq

我很难找到一种 Spring 方法来初始化将传入消息发送到多个队列的交换 - 在我的 Spring-boot 应用程序上:

我找不到定义秒交换队列绑定(bind)的好方法。

我使用 RabbitTemplate 作为生产者客户端。

RabbitMQ 6 页教程并没有真正帮助,因为:

  1. 消费者按需提供的唯一几个初始临时队列(而我需要生产者进行绑定(bind) - 到持久队列)
  2. 这些示例用于基本的 java 用法 - 不使用 Spring 功能。

我也没有找到如何通过 spring AMQP 页面实现它。

到目前为止我得到的是尝试将基本的 java 绑定(bind)注入(inject) spring 的方式 - 但它不起作用......

@Bean
public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");

    Connection conn = connectionFactory.createConnection();
    Channel channel = conn.createChannel(false);

    channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
    channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
    channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind

    return connectionFactory;
}

任何帮助将不胜感激

已编辑

认为问题出现的事实是,每次我重新启动服务器时,它都会尝试重新定义交换查询绑定(bind)——而它们仍然存在于代理中…… 我设法通过代理 UI 控制台手动定义它们 - 因此生产者只知道交换名称,而消费者只知道它的相关队列。 有没有一种方法可以以编程方式定义这些元素 - 但如果在以前的重启中已经存在,那么它不会被重新定义\覆盖?

最佳答案

我们使用类似于下面的方法将数据从一个特定的输入 channel 发送到其他消费者的多个输入队列:

@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
    IntegrationFlows
        .from("some-input-channel")
        .handle(Amqp.outboundAdapter(rabbitTemplate)
        .headerMapper(headerMapper))
        .get()    
}

@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
    final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
    headerMapper.setRequestHeaderNames("*");
    return headerMapper;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
   return new CachingConnectionFactory();
}

@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
    final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
    fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
    for (final String queueName : MY_FANOUT.getQueueNames) {
        final Queue queue = new Queue(queueName, true);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);

        final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
    }
    rabbitTemplate.setExchange(fanoutExchange);    
}

为了完整起见,这里是扇出声明的枚举:

public enum MyFanout {
    MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),

    private final List<String> queueNames;
    private final String fanoutName;

    MyFanout(final List<String> queueNames, final String fanoutName) {
        this.queueNames = requireNonNull(queueNames, "queue must not be null!");
        this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
    }

    public List<String> getQueueNames() {
        return this.queueNames;
    }

    public String getFanoutName() {
        return this.fanoutName;
    }
}

希望对您有所帮助!

关于java - RabbitMQ - Java Spring - 如何初始化交换到多个队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37906043/

相关文章:

java - @MockBean 和 @Autowired 在一个测试类中的相同服务

Java 子类化或实现具有相同边界的泛型类型

java - 如何使用 jdbc/spring-jdbc 不使用 PGInterval 对 PostgreSQL 区间数据类型进行操作?

java - 文件 : spring-security. xml 没有嵌入样式表指令

spring - 我如何使用 spring 文档记录 java.util.Map

java - ConcurrentModificationException 使用迭代器时出错 [.next()]

java - 合并排序中的通用数组初始化

java - JSF、Spring 和 PreRenderViewEvent

java - 在 Jersey 2 中使用 Hystrix Java Servlet 和 Servlet 过滤器

java - 如何使用 Jackson 全局定义命名约定