java - 如何将消费者异常作为确认来处理?

标签 java spring rabbitmq spring-amqp

我正在尝试对消费者失败实现指数退避。为此,我有三个带有 DLX 的队列:RETRY -> MAIN -> FAILED .

任何被 MAIN 拒绝的内容去FAILED ,以及添加到 RETRY 的任何内容进入MAIN每条消息的 TTL 之后。消费者收到来自 MAIN .

我已经实现了 ErrorHandler并将其设置为 SimpleRabbitListenerContainerFactory 。该处理程序计算新的 TTL 并将消息发送到 RETRY队列,或抛出 AmqpRejectAndDontRequeueException如果这是不可能的或超过重试次数以便将其 DLX 到 FAILED 。问题是,我无法弄清楚如何删除原始消息。

据我所知,我必须确认它,但是 Channel在错误处理程序中不可用,并且没有其他异常可以引发会触发确认。

如果我删除 MAIN -> FAILED DLX 并切换为手动将消息添加到 FAILED ,那么如果这不起作用,我就丢失了消息。

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
    // what to do here?
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }
  // or here?
}

最佳答案

我想我立即找到了答案。之前错过了,因为我从错误的地方扔了。

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }

  throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}

ImmediateAcknowledgeAmqpException

Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.

这应该是安全的,因为我没有使用批处理或交易,仅使用发布商返回。

<小时/>

旁注:我还应该意识到指数退避实际上不会正常工作:

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.

关于java - 如何将消费者异常作为确认来处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41462515/

相关文章:

java - 从 URL 中的 JSON 创建 Java 数组

java - 打印数组元素而不重复元素

java - 未抛出 Spring 的自定义异常

java - 如何获取RabbitMQ中某个主题的订阅数量

go - tls : handshake failure when enabling tls for RabbitMQ with streadway/amqp

Java:尝试忽略(或跳过)不是行尾的回车符(\n)

java - 使用 JSON 编码字符串

java - Spring mvc bean 在没有 @Controller 声明的情况下充当 Controller

java - 无法 Autowiring 字段,但我有定义

c# - RabbitMQ 中工作队列的请求-响应模式