java - Apache Camel 与 RabbitMQ : messages in temp reply queue not being acked when autoAck=false on endpoint configuration

标签 java rabbitmq apache-camel

在使用 camel-rabbitmq 扩展配置 Camel 中支持 InOut 的路由时,我注意到了一个问题。 当我将主队列配置设置为 autoAck=false 时,同样的配置也被复制到临时回复队列(它甚至使用相同的 prefetch(5) 设置,在 RabbitMQ 控制台中很容易看到)。这会导致临时队列中的消息无限期地坐在那里,直到服务器重新启动。

Virtual host Name                         Features  State Ready Unacked Total incoming deliver / get ack
/test      amq.gen-Hkdx9mckIfMc6JhDI6d-JA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s
/test      amq.gen-eUU7BRI3Ooo4F8Me7HrPnA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s

即使在日志中我可以清楚地看到收到了回复消息,只是 ack 似乎没有发送到 RabbitMQ 以从临时队列中清除我们的消息。而且我在控制台中检查了两个临时队列都有消费者,所以我希望 Camel 发送 ack。

o.a.c.c.r.RabbitMQMessagePublisher  - Sending message to exchange: emailfeedbackExchange with CorrelationId = Camel-ID-VMS-1534332570964-0-11
o.a.c.c.r.r.ReplyManagerSupport  - Received reply message with correlationID [Camel-ID-VMS-1534332570964-0-11]

问题是,如何在保持我的 autoAck=false 和 InOut 可用路由的同时防止这种情况发生? 也许我应该在这里提到没有错误或类似的东西,流程按预期工作并且电子邮件处理工作完美,唯一的问题是临时队列中的陈旧消息。

我们的 Camel 版本是 2.20.2 这是我们拥有的所有 Camel 组件的相关 Gradle 配置:

compile ("org.apache.camel:camel-spring-boot-starter:${camelVersion}")
compile ("org.apache.camel:camel-rabbitmq:${camelVersion}")
compile ("org.apache.camel:camel-amqp:${camelVersion}")

队列和路由配置:

restentrypointroute:
    restEndpoint: /app
    postEndpoint: /email
    outputEmailEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true

emailroutebuilder:
    serviceName: emailroutebuilder
    inputEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true
    emailProcessor: bean:emailProcessor
    maximumRedeliveries: 5
    redeliveryDelay: 30000

这是 RestRouteBuilder 实现中的相关部分:

@Override
public void configure() throws Exception {

restConfiguration().component("restlet").bindingMode(RestBindingMode.json);

    rest(restEndpoint).post(postEndpoint)
      .type(MyRequest.class)
      .route()
      .startupOrder(Integer.MAX_VALUE - 2)
      .process(this::process)
      .choice()
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.GENERATED)).to(outputEmailEndpoint)
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint).end()
      .endRest();

process() 方法将 DELIVERYSTATUS_HEADER header 添加到 Camel 交换并验证有效负载。

EmailRouteBuilder 如下所示:

public void configure() throws Exception {
    super.configure();

    from("direct:" + getServiceName())
            .to(emailProcessor)
            .process(ex -> {
                ex.setOut(ex.getIn());

            });

}

super.configure() 调用中配置异常处理和死信、启动顺序、重试次数、最大重新交付等。那里的代码相当多,但如果你想点什么可能是这个问题的原因,我会把它贴出来。另外,如果您需要我添加任何其他配置,请告诉我。

从上面可以清楚地看出为什么我们需要一个带有 autoAck=falseInOut 路由,因为从业务角度来看,丢失电子邮件是不好的,REST 客户端需要一个基于 EmailProcessor 的响应方式。如何摆脱临时队列中的陈旧消息?

编辑 实际上,该路由只有在预取计数用完之前才能正常工作,之后它开始抛出异常并且 REST 客户端收到 HTTP 500 响应。

org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-VMSYS119-1534407032085-0-284 not received on destination: amq.gen-eUU7BRI3Ooo4F8Me7HrPnA.

最佳答案

根据评论,这原来是 camel-rabbitmq 组件中的一个错误,现在已将修复应用到 master 分支。

此处为 Jira 票证:https://issues.apache.org/jira/browse/CAMEL-12746

此修复将在 2.21.3、2.22.1、2.23.0 及更高版本中提供。

编辑:

在答案中包含代码更改。

TemporaryQueueReplyManager 第 139 行 - 始终使用 true 的自动确认模式启动临时队列的消费者。

改变这个:

private void start() throws IOException {         
    tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);     
}

对此:

private void start() throws IOException {
     tag = channel.basicConsume(getReplyTo(), true, this);
 }

关于java - Apache Camel 与 RabbitMQ : messages in temp reply queue not being acked when autoAck=false on endpoint configuration,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51875646/

相关文章:

java - 在 Spring 启动应用程序启动时获取为MethodArgumentNotValidException映射的模糊@ExceptionHandler方法

java - 将 UMLS 用户名和密码添加到 Ctakes

java - 如何使用camel-avro-消费者和生产者?

java - 在哪里可以找到 CPLEX 库(不是 AIMMS 的 GUI)?

java - java中char数组到String数组的转换

node.js - RabbitMQ 和 Node amqp : Exchange in confirmed mode does not confirm - why?

ssl - Kubernetes 上的 RabbitMQ kubernetes.default.svc.cluster.local tls qlert

rabbitmq:消费者可以在 nack 之前坚持消息更改吗?

spring - Camel Spring EMS SSL 不工作

java - 如何为 REST Web 服务创建 SOAP 前端