我有一个 Jersey 应用程序,我在其中使用 spring amqp 库将消息发布到 rabbitMQ 交换器。我在我的兔子模板中使用 CachingConnectionFactory,最初 Channel-Transacted 设置为 false。我注意到有些消息实际上并没有发布到交易所,所以我将 channel-transacted 值更改为 true。
在执行此操作时,我的发布功能开始耗时 500 毫秒(当交易 channel 为假时耗时 5 毫秒)。我在这里遗漏了什么,因为 500 毫秒太多了。
作为替代方案,我尝试将 publisherConfirms 设置为 true 并添加一个 ConfirmCallback。我还没有对此进行基准测试,但想知道与 channel 事务相比,这是否具有更好的性能,因为此应用程序的唯一目的是将消息发布到 RabbitMQ 中的交换?
另外,如果我使用 publisherConfirms,我想在失败的情况下实现重试,或者至少能够抛出异常。使用 channel 事务处理,如果出现故障我会得到异常,但在这种情况下延迟很高。我不确定如何使用 publisherConfirms 实现重试。
我尝试重试并确认发布者,但我的代码只是挂起。
这是我的代码:
CompleteMessageCorrelationData.java
public class CompleteMessageCorrelationData extends CorrelationData {
private final Message message;
private final int retryCount;
public CompleteMessageCorrelationData(String id, Message message, int retryCount) {
super(id);
this.message = message;
this.retryCount = retryCount;
}
public Message getMessage() {
return this.message;
}
public int getRetryCount() {
return this.retryCount;
}
@Override
public String toString() {
return "CompleteMessageCorrelationData [id=" + getId() + ", message=" + this.message + "]";
}
}
设置缓存连接工厂:
private static CachingConnectionFactory factory = new CachingConnectionFactory("host");
static {
factory.setUsername("rmq-user");
factory.setPassword("rmq-password");
factory.setChannelCacheSize(50);
factory.setPublisherConfirms(true);
}
private final RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if (correlation != null && !ack) {
CompleteMessageCorrelationData data = (CompleteMessageCorrelationData)correlation;
log.info("Received nack for message: " + data.getMessage() + " for reason : " + reason);
int counter = data.getRetryCount();
if (counter < Integer.parseInt(max_retries)){
this.rabbitTemplate.convertAndSend(data.getMessage().getMessageProperties().getReceivedExchange(),
data.getMessage().getMessageProperties().getReceivedRoutingKey(),
data.getMessage(), new CompleteMessageCorrelationData(id, data.getMessage(), counter++));
} else {
log.error("Max retries exceeded for message: " + data.getMessage());
}
}
});
发布消息:
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CompleteMessageCorrelationData(id, message, 0));
所以,简而言之:
我是不是在使用 Channel-transacted 时做错了什么导致延迟如此之高?
如果我改为实现 publisherConfirms 并进行重试,考虑到除了向 rabbitmq 发布消息之外没有其他工作,我的方法有什么问题,它的性能是否会比 channel 交易更好?
最佳答案
正如您所发现的,事务很昂贵并且会显着降低性能;不过,500 毫秒似乎很高。
我认为发布者确认不会有太大帮助。在释放 servlet 线程之前,您仍然必须等待到代理的往返行程。当您发送一堆消息然后等待所有确认返回时,发布者确认很有用;但是当你只发送一条消息然后等待确认时,它可能不会比使用事务快多少。
虽然您可以尝试一下,但是代码有点复杂,尤其是如果您想要处理异常,您可以通过事务“免费”获得异常。
关于java - Spring AMQP - channel 交易与发布者确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48094738/