我有以下代码来声明一个队列:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);
和以下获取下一个 Delivery 对象并处理它:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
}
}
反序列化代码。如您所见,我正在使用 Kryo:
public T deserialise(byte[] body) {
Kryo kryo= new Kryo();
Input input = new Input(body);
T deserialised = kryo.readObject(input, getQueueClass());
input.close();
return deserialised;
}
如果我在包含大量对象的队列中运行此程序,在大约 270 万个对象之后我会遇到内存不足异常。我最初是通过整夜运行它发现它的,数据从 JMeter 以 ~90/s 的速率传入,起初它没有任何问题地消耗,但在早上我注意到 RabbitMQ 中有大量数据并且出现内存不足异常消费者。我再次运行它并使用 Eclipse 内存分析器来确定内存的使用位置。从这里我可以看到 com.rabbitmq.client.QueueingConsumer 引用的 java.util.concurrent.LinkedBlockingQueue 越来越大,直到内存不足。
我需要做些什么来告诉 Rabbit 释放资源吗?
我可以增加堆大小,但我担心这只是一个短期修复,我的代码中可能有一些东西可能会在生产部署几个月后导致内存泄漏。
最佳答案
我的错误是我将 channel 设置为自动确认。这意味着来自 Rabbit 的每条消息都会得到确认(确认已收到)。我已经通过将 channel 声明为不自动确认来修复(并测试)了此问题: consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
.
这就是我的队列 decleration 现在的样子:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), false,consumer);
和以下处理队列:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
} catch (IOException e) {
logger.error("Could not ack message: {}",e);
break;
}
}
我现在可以在 RabbitMQ 管理屏幕中看到消息正在以非常高的速率传送,但它们没有以该速率被确认。如果我然后杀死我的消费者,在大约 30 秒内,所有这些未确认的消息都将移回就绪队列。我将进行的改进之一是设置 basicQos 值:channel.basicQos(10);
这样就不会有太多的消息被传递但未被确认。这是可取的,因为这意味着我可以将另一个消费者启动到同一个队列并开始处理该队列,而不是全部以未确认的方式在内存中结束并且对其他消费者不可用。
关于java - RabbitMQ QueueingConsumer 可能的内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12687368/