rabbitmq ReturnCallback 在 NO_ROUTE(312) 上声明队列时卡住

标签 rabbitmq amqp spring-amqp

如果没有路由可用,我正在尝试使用发布者返回回调来声明队列和绑定(bind),以便消息不会再次被丢弃。 这是因为我的队列是自动删除的,如果我的消费者宕机,它会被删除。

但是 ReturnCallback 线程卡在了 admin.declareQueue(queue) 的 returnedMessage() 中。

在进一步调试时,我看到它卡在 RabbitAdmin.declareQueue() 中: DeclareOk[] declare = declareQueues(channel, queue);

虽然此调用被卡住,但我看到队列已声明(通过控制台检查)。此外,后续的 send 调用不会调用 returnedMessage,因为第一个 returnedMessage 调用可能尚未返回。

我这里做错了什么?在返回回调中声明队列/绑定(bind)是否正确?

任何帮助将不胜感激。谢谢。

下面是我的 ReturnCallback:

public class MyReturnCallback implements ReturnCallback {
    // constructor, member initialization goes here

    @Override
    public void returnedMessage(Message message, int replyCode,
            String replyText, String exchangeName, String routingKey) {
        if (replyCode == 312) {
            if (this.exchangeName.equals(exchangeName) && this.routingKey.equals(routingKey)) {
                RabbitAdmin admin = new RabbitAdmin(connectionFactory);
                Exchange exchange = new DirectExchange(exchangeName, true, false);
                Queue queue = new Queue(queueName, true, false, true);
                admin.declareQueue(queue);
                Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey); 
                admin.declareBinding(binding);
                if (null != binding) {
                    RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory);
                    logger.debug("Sending to [exchange:" + exchange.getName() + ", routing-key:" + routingKey + "]:" + message.toString());
                    rabbitmqTemplate.send(exchangeName, routingKey, message);
                }
            }
        }
    }
}

我的测试制作人是这样的:

public class TestProducer {
    // constructor, member initialization goes here

    void initialize()
        rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setMessageConverter(messageConverter);
        rabbitTemplate.setRoutingKey(routingKey);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());

        Exchange exchange = new DirectExchange(exchangeName, true, false);
        rabbitAdmin.declareExchange(exchange);
        Queue queue = new Queue(queueName, true, false, true);
        rabbitAdmin.declareQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey); 
        rabbitAdmin.declareBinding(binding);
    }

    void send() {
        rabbitTemplate.convertAndSend(message);
    }
}

ConnectionFactory bean 是:

<rabbit:connection-factory id="rabbitmqConnectionFactory"
    host="${rabbitmq.host:localhost}"
    port="${rabbitmq.port:5672}"
    username="${rabbitmq.username}"
    password="${rabbitmq.password}"
    virtual-host="${rabbitmq.vhost:/}"
    cache-mode="CHANNEL"
    channel-cache-size="${rabbitmq.channel-cache-size:25}" 
    publisher-returns="true"/>

附上调试日志以供引用:

2016-10-04 17:58:58,881 [main] INFO  CachingConnectionFactory:291 - Created new connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,883 [main] DEBUG RabbitAdmin:399 - Initializing declarations
2016-10-04 17:58:58,883 [main] DEBUG DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'org.springframework.context.annotation.ConfigurationClassPostProcessor.importRegistry'
2016-10-04 17:58:58,935 [main] DEBUG CachingConnectionFactory:453 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:58,975 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@67f639d3
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,979 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:58,979 [main] DEBUG RabbitAdmin:460 - Declarations finished
2016-10-04 17:58:58,980 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:58,981 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,981 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@82de64a
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,982 [main] DEBUG RabbitAdmin:487 - declaring Exchange 'myexchange'
2016-10-04 17:58:59,006 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:59,007 [main] INFO  TestProducer:59 - Declared/Declare-confirmed for direct exchange: myexchange
2016-10-04 17:58:59,009 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload]
2016-10-04 17:58:59,090 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:59,091 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,091 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@793f29ff
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,098 [main] DEBUG RabbitTemplate:1325 - Publishing message on exchange [myexchange], routingKey = [mykey]
2016-10-04 17:58:59,104 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:59,134 [AMQP Connection ip-address:5672] DEBUG TestProducer:103 - returnedMessage, replyCode: 312, replyText: NO_ROUTE
2016-10-04 17:58:59,135 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:59,136 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@7ec6c641
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,164 [AMQP Connection ip-address:5672] DEBUG RabbitAdmin:515 - declaring Queue 'myqueue'
2016-10-04 17:59:29,104 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload]

Nothing happens after this.

最佳答案

有趣的问题 - 由于 channel 在初始发送后放回缓存中,当返回在该 channel 上传递时,回调中的发送获得相同的 channel ,这导致 channel 内的死锁 - 执行声明而我们仍在处理返回。

我们不能真正延迟将 channel 放回缓存中,直到我们得到返回,因为我们不知道我们是否会得到返回。

我会考虑是否有任何方法可以检测并避免这种情况,但与此同时,对您来说最安全的做法是处理声明并在不同的线程上重新发布。 .

private final Executor executor = Executors.newCachedThreadPool();

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchangeName,
        String routingKey) {
    if (replyCode == 312) {
        executor.execute(() -> {
            RabbitAdmin admin = new RabbitAdmin(connectionFactory);
            Exchange exchange = new DirectExchange(exchangeName, true, false);
            Queue queue = new Queue("foo", true, false, true);
            admin.declareQueue(queue);
            Binding binding = BindingBuilder.bind(queue).to((DirectExchange) exchange).with(routingKey);
            admin.declareBinding(binding);
            if (null != binding) {
                RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory);
                System.out.println("Sending to [exchange:" + exchange.getName() + ", routing-key:" + routingKey
                        + "]:" + message.toString());
                rabbitmqTemplate.send(exchangeName, routingKey, message);
            }
        });
    }
}

关于rabbitmq ReturnCallback 在 NO_ROUTE(312) 上声明队列时卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39859364/

相关文章:

php - 在 php-amqplib 中使用确认(发布者确认)

java - RabbitTemplate.send() 是否保证将消息传递到 RabbitMQ?

python - celery 设计帮助 : how to prevent concurrently executing tasks

spring - AMQP 代理与 Spring-AMQP/RabbitMQ 断开/重新连接的通知

rabbitmq - 如何将 .erlang.cookie 位置更改为外部用户主目录?

docker 中 rabbitmq 的 Spring Boot 应用程序问题

java - 实现不带注释的 Spring 代码

java - 如何为从 spring amqp 消息完成的事务建立安全上下文

RabbitMQ错误: unable to connect to nodes : nodedown

python - celery 工作人员拒绝反序列化 application/json 的不受信任内容类型