我正在尝试对消费者失败实现指数退避。为此,我有三个带有 DLX 的队列:RETRY -> MAIN -> FAILED
.
任何被 MAIN
拒绝的内容去FAILED
,以及添加到 RETRY
的任何内容进入MAIN
每条消息的 TTL 之后。消费者收到来自 MAIN
.
我已经实现了 ErrorHandler
并将其设置为 SimpleRabbitListenerContainerFactory
。该处理程序计算新的 TTL 并将消息发送到 RETRY
队列,或抛出 AmqpRejectAndDontRequeueException
如果这是不可能的或超过重试次数以便将其 DLX 到 FAILED
。问题是,我无法弄清楚如何删除原始消息。
据我所知,我必须确认它,但是 Channel
在错误处理程序中不可用,并且没有其他异常可以引发会触发确认。
如果我删除 MAIN -> FAILED
DLX 并切换为手动将消息添加到 FAILED
,那么如果这不起作用,我就丢失了消息。
@Override
public void handleError(Throwable t) {
log.warn("Execution of Rabbit message listener failed.", t);
try {
queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
// what to do here?
} catch (RuntimeException ex) {
t.addSuppressed(ex);
log.error("Not requeueing after failure", t);
throw new AmqpRejectAndDontRequeueException(t);
}
// or here?
}
最佳答案
我想我立即找到了答案。之前错过了,因为我从错误的地方扔了。
@Override
public void handleError(Throwable t) {
log.warn("Execution of Rabbit message listener failed.", t);
try {
queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
} catch (RuntimeException ex) {
t.addSuppressed(ex);
log.error("Not requeueing after failure", t);
throw new AmqpRejectAndDontRequeueException(t);
}
throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}
ImmediateAcknowledgeAmqpException
Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.
这应该是安全的,因为我没有使用批处理或交易,仅使用发布商返回。
<小时/>旁注:我还应该意识到指数退避实际上不会正常工作:
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.
关于java - 如何将消费者异常作为确认来处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41462515/