java - Spring-amqp - 消息处理延迟

标签 java spring rabbitmq spring-amqp

我们在 RHEL 7.0 VM 上部署了一个 Java/spring/tomcat 应用程序,该应用程序使用 AlejandroRivera/embedded-rabbitmq,一旦部署了 war,它就会启动 Rabbitmq 服务器并连接到它。我们有多个队列用于处理和过滤事件。

流程是这样的:

我们收到的事件 -> 发布事件队列 -> 监听类过滤事件 -> 发布到另一个队列进行处理 -> 我们发布到另一个队列进行日志记录。

问题是:

  • 处理正常开始,我们可以看到消息流经队列,但一段时间后监听器类停止接收事件。看起来我们能够将其发布到 RabbitMQ channel ,但它从未从队列中发送到监听器。 这似乎开始恶化,导致事件在一段时间后被处理,上升到几分钟。负载并没有那么高,大约有 200 个事件,我们只关心其中的一小部分。

我们尝试过的:

  • 最初,队列预取设置为 1,消费者最小值为 2,最大值为 5,我们删除了预取,并添加了更多消费者作为最大并发设置,但问题仍然存在,延迟只是需要更长的时间才能呈现,但几分钟后,处理开始需要大约 20/30 秒。

我们在日志中看到我们将事件发布到队列,并且我们在日志中看到我们将事件从队列中延迟了。因此,我们的代码中间没有运行任何东西来产生这种延迟。

据我们所知,其余队列似乎都能正确处理消息,但正是这个队列陷入了这种卡住模式。

我看到的错误如下,但我很确定它的含义以及是否相关:

Jun  4 11:16:04  server: [pool-3-thread-10] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer@70dfa413 (amq.ctag-VaWc-hv-VwcUPh9mTQTj7A) method handleDelivery for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198) threw an exception for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198)
Jun  4 11:16:04  server: java.io.IOException: Unknown consumerTag
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1266)
Jun  4 11:16:04  server: at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
Jun  4 11:16:04  server: at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun  4 11:16:04  server: at java.lang.reflect.Method.invoke(Method.java:498)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
Jun  4 11:16:04  server: at com.sun.proxy.$Proxy119.basicCancel(Unknown Source)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:846)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jun  4 11:16:04  server: at java.lang.Thread.run(Thread.java:748)

这种情况发生在应用程序关闭时,但我见过它发生在应用程序仍在运行时..

2018-06-05 13:22:45,443 ERROR CachingConnectionFactory$DefaultChannelCloseLogger - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 109, class-id=60, method-id=120)

我不确定如何解决这两个错误,也不知道它们是否相关。

这是我的 Spring 配置:

<!-- Queues -->
<rabbit:queue id="monitorIncomingEventsQueue" name="MonitorIncomingEventsQueue"/>
<rabbit:queue id="interestingEventsQueue" name="InterestingEventsQueue"/>
<rabbit:queue id="textCallsEventsQueue" name="TextCallsEventsQueue"/>
<rabbit:queue id="callDisconnectedEventQueue" name="CallDisconnectedEventQueue"/>
<rabbit:queue id="incomingCallEventQueue" name="IncomingCallEventQueue"/>
<rabbit:queue id="eventLoggingQueue" name="EventLoggingQueue"/>

<!-- listeners -->
<bean id="monitorListener" class="com.example.rabbitmq.listeners.monitorListener"/>
<bean id="interestingEventsListener" class="com.example.rabbitmq.listeners.InterestingEventsListener"/>
<bean id="textCallsEventListener" class="com.example.rabbitmq.listeners.TextCallsEventListener"/>
<bean id="callDisconnectedEventListener" class="com.example.rabbitmq.listeners.CallDisconnectedEventListener"/>
<bean id="incomingCallEventListener" class="com.example.rabbitmq.listeners.IncomingCallEventListener"/>
<bean id="eventLoggingEventListener" class="com.example.rabbitmq.listeners.EventLoggingListener"/>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="40" acknowledge="none">
    <rabbit:listener queues="interestingEventsQueue" ref="interestingEventsListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="textCallsEventsQueue" ref="textCallsEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="callDisconnectedEventQueue" ref="callDisconnectedEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="30" acknowledge="none">
    <rabbit:listener queues="incomingCallEventQueue" ref="incomingCallEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="1" max-concurrency="3" acknowledge="none">
    <rabbit:listener queues="monitorIncomingEventsQueue" ref="monitorListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="10"  acknowledge="none">
    <rabbit:listener queues="EventLoggingQueue" ref="eventLoggingEventListener" method="handleLoggingEvent"/>
</rabbit:listener-container>

<rabbit:connection-factory id="connectionFactory" host="${host.name}" port="${port.number}" username="${user.name}" password="${user.password}" connection-timeout="20000"/>

我在这里读到,处理延迟可能是由网络问题引起的,但在这种情况下,服务器和应用程序位于同一虚拟机上。这是一个锁定的环境,因此大多数端口都没有打开,但我怀疑这就是问题所在。

更多日志:https://pastebin.com/4QMFDT7A

感谢任何帮助,

谢谢

最佳答案

我需要查看更多的日志 - 这是确凿的证据:

Storing...Storing delivery for Consumer@a2ce092: tags=[{}]

(消费者)tags 为空,这意味着消费者当时已经被取消(由于某种原因,它应该出现在日志的前面)。

如果您有机会使用 1.7.9.BUILD-SNAPSHOT 进行重现,我添加了一些 TRACE 级别日志记录,这应该有助于诊断此问题。

编辑

回复您最近对rabbitmq-users的评论...

您可以尝试使用固定并发吗? Spring AMQP 容器中的可变并发通常不是很有用,因为消费者通常只有在整个容器空闲一段时间后才会减少。

但是,这可能可以解释为什么您会看到消费者被取消。

也许该逻辑中存在一些竞争条件;使用固定数量的消费者(不指定最大...)将避免这种情况;如果你能尝试一下,至少会消除这种可能性。

也就是说,我很困惑(我在你的 Stack Overflow 配置中没有注意到这一点);使用 acknowledge="none" 时,不应向代理发送任何确认(NONE 用于设置 autoAck)

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), ...

public boolean isAutoAck() {

    return this == NONE;

}

你正在从你的代码发送acks吗?如果是这样,确认模式应该是手动。我看不到容器会发送 NONE ack 模式的 ack 的场景。

关于java - Spring-amqp - 消息处理延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50719362/

相关文章:

java - 如何使用 Ansible 安装 Oracle Java 8

java - Seam 3 教程,Spring 3 + Seam 3 集成

node.js - AMQPLIB - NodeJS - 在 RabbitMQ 中断言惰性队列

javascript - 列出绑定(bind)到 RabbitMQ 中交换器的所有队列

java - GroovyScriptEngine : load groovy scripts from subfolder

java - kubernetes 上的 Spark 作业失败,没有特定错误

Spring + hibernate 与 Spring Data JPA : Are they different?

java - 多线程多 channel 发布时线程全部被阻塞,rabbitmq

java - 使用 guice 构造函数参数注入(inject)

spring - 用 Thymeleaf 扩展视野