java - Spring AMQP - channel 交易与发布者确认

标签 java spring amqp spring-amqp

我有一个 Jersey 应用程序,我在其中使用 spring amqp 库将消息发布到 rabbitMQ 交换器。我在我的兔子模板中使用 CachingConnectionFactory,最初 Channel-Transacted 设置为 false。我注意到有些消息实际上并没有发布到交易所,所以我将 channel-transacted 值更改为 true。

在执行此操作时,我的发布功能开始耗时 500 毫秒(当交易 channel 为假时耗时 5 毫秒)。我在这里遗漏了什么,因为 500 毫秒太多了。

作为替代方案,我尝试将 publisherConfirms 设置为 true 并添加一个 ConfirmCallback。我还没有对此进行基准测试,但想知道与 channel 事务相比,这是否具有更好的性能,因为此应用程序的唯一目的是将消息发布到 RabbitMQ 中的交换?

另外,如果我使用 publisherConfirms,我想在失败的情况下实现重试,或者至少能够抛出异常。使用 channel 事务处理,如果出现故障我会得到异常,但在这种情况下延迟很高。我不确定如何使用 publisherConfirms 实现重试。

我尝试重试并确认发布者,但我的代码只是挂起。

这是我的代码:

CompleteMessageCorrelationData.java

public class CompleteMessageCorrelationData extends CorrelationData {

    private final Message message;
    private final int retryCount;

    public CompleteMessageCorrelationData(String id, Message message, int retryCount) {
        super(id);
        this.message = message;
        this.retryCount = retryCount;
    }

    public Message getMessage() {
        return this.message;
    }

    public int getRetryCount() {
        return this.retryCount;
    }

    @Override
    public String toString() {
        return "CompleteMessageCorrelationData [id=" + getId() + ", message=" + this.message + "]";
    }

}

设置缓存连接工厂:

private static CachingConnectionFactory factory = new CachingConnectionFactory("host");
static {
    factory.setUsername("rmq-user");
    factory.setPassword("rmq-password");
    factory.setChannelCacheSize(50);
    factory.setPublisherConfirms(true);
}
private final RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
        if (correlation != null && !ack) {
            CompleteMessageCorrelationData data = (CompleteMessageCorrelationData)correlation;
            log.info("Received nack for message: " + data.getMessage() + " for reason : " + reason);
            int counter = data.getRetryCount();
            if (counter < Integer.parseInt(max_retries)){
                this.rabbitTemplate.convertAndSend(data.getMessage().getMessageProperties().getReceivedExchange(),
                        data.getMessage().getMessageProperties().getReceivedRoutingKey(),
                        data.getMessage(), new CompleteMessageCorrelationData(id, data.getMessage(), counter++));
            } else {
                log.error("Max retries exceeded for message: " + data.getMessage());
            }
        }
    });

发布消息:

rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CompleteMessageCorrelationData(id, message, 0));

所以,简而言之:

  1. 我是不是在使用 Channel-transacted 时做错了什么导致延迟如此之高?

  2. 如果我改为实现 publisherConfirms 并进行重试,考虑到除了向 rabbitmq 发布消息之外没有其他工作,我的方法有什么问题,它的性能是否会比 channel 交易更好?

最佳答案

正如您所发现的,事务很昂贵并且会显着降低性能;不过,500 毫秒似乎很高。

我认为发布者确认不会有太大帮助。在释放 servlet 线程之前,您仍然必须等待到代理的往返行程。当您发送一堆消息然后等待所有确认返回时,发布者确认很有用;但是当你只发送一条消息然后等待确认时,它可能不会比使用事务快多少。

虽然您可以尝试一下,但是代码有点复杂,尤其是如果您想要处理异常,您可以通过事务“免费”获得异常。

关于java - Spring AMQP - channel 交易与发布者确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48094738/

相关文章:

java jetty json获取请求时间和持续时间

Java 8 按多个字段分组到单个映射中

spring - 使用 Spring Boot 启动应用程序时,“jpaAuditingHandler”在 null 中定义

SpringMVC - 无法访问 index.jsp 中的 JS 和 CSS 文件

java - “字段需要一个无法找到的 bean。”使用mongodb的错误spring restful API

c++ - 链接共享库,它也链接了 cmake 中的不同共享库

c++ - 错误 : ‘POPT_ARG_ARGV’ undeclared. 我需要使用旧版 gcc 吗?

java - 多张图片在电子邮件中共享

javascript - HTML 中的有向图表示

RabbitMQ - 一个消费者是否阻塞同一队列的其他消费者?