这是我正在尝试做的事情:
- 出列消息
- 对消息执行操作
- 如果操作失败,将消息放回队列
- 如果操作成功,确认消息
我现在的问题是,如果操作失败,消息不会重新排队,但会保持未确认状态。如果我进入 RabbitMQ Web 配置界面,我会看到消息被标记为未确认,即使 basic.Nack 已被跳过。
var delivery = subscription.Next();
var messageBody = delivery.Body;
try
{
action.Invoke(messageBody);
subscription.Ack(delivery);
}
catch (Exception ex)
{
subscription.Model.BasicNack(delivery.DeliveryTag, false, true);
throw ex;
}
更新:
所以我注意到消息从“就绪”到“未确认”的速度非常快。比我实际调用 subscriber.Next() 的速度快得多,就好像 .Net 客户端将所有消息缓存在内存中(我的应用程序的内存足迹实际上增长得非常快),并从内存中处理这些消息并在之后发送 Ack(),取消标记来自 Unacknowledged 的消息。
更新 2:
似乎队列被清空得非常快是因为我没有在我的模型上设置 BasicQos。以下修复了一切。 Basic.Nack() 似乎仍然不起作用:
Model.BasicQos(0, 1, false)
最佳答案
我怀疑你正在使用:
channel.BasicConsume(your_queue_name, false, consumer);
检索消息。
我用 RabbitMQ 3.2.4 服务器和客户端运行了几个测试。我无法获得 channel.BasickAck(...)
或 channel.BasicNack(...)
按预期工作。
也就是说,我能够得到预期的 Ack |我使用时的 Nack 行为:
BasicGetResult result = channel.BasicGet(your_queue_name, false);
因此您可能需要考虑使用不同的检索方法来获取消息。我意识到 Consume 和 Dequeue 是“首选”方法,但它们在我的案例中不起作用。我想要公平的、一次一个的发送并带有确认。使用 BasicGet 是我实现这一目标的唯一方法。
这种方法的缺点是您可能会丢失与 subscription.Next()
一起使用的客户端事件迭代器。 .
如果我不得不冒险猜测,我认为本地队列集合的某些内容正在扰乱 channel 提供确认的能力。值得指出的是,使用 new QueueingBasicConsumer(channel);
创建消费者触发从服务器队列中预取事件的调用。消费者的队列只是一个 SharedQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs>
而 SharedQueue 只是 IEnumerable 的扩展。
另请记住,拉取消息的同一 channel 需要提供 Ack |纳克。 你不能确认 |拒绝来自不同 channel 的消息。或者至少我还没有弄清楚该怎么做,其他人也没有。如果您将 RabbitMQ 对象包装在 using 语句中(这样您就不会留下网络资源),那将是一个问题并且在您可以安全确认之前,您需要运行很长的过程.
这SO Answer制定了一个体面的工作流程来解决可能的现实,即您的拉取 channel 不会将成为发送 Ack 的 channel |纳克。诀窍是设置 TTL 而不是发送 Nack - 只需让新消息过期并自动重新排队。
关于c# - basic.Nack 未被处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21244281/