java - RabbitMQ 和 Camel : route interrupted due to "Message dropped on recovery"

标签 java rabbitmq message-queue apache-camel spring-rabbit

我在使用 RabbitMQ 和 sprimg-amqp 时遇到了令人沮丧的问题。我需要从外部进程向队列发送消息(JUnit 类只是为了测试路由是否正常工作)。这是我的camel-context.xml 文件:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route> 
        <from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&amp;autodelete=true&amp;durable=true" />
        <log message="Message received!!! "/> 
        <to   uri="spring-amqp:KipcastDirect2:TestQueue:KipcastRouting2?type=direct&amp;autodelete=false&amp;durable=true" />
    </route>
</camelContext>

<rabbit:connection-factory id="amqpConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
<rabbit:admin connection-factory="amqpConnectionFactory"/>

<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="10.211.55.20"/>
    <property name="port" value="5672"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="virtualHost" value="/"/>
</bean>

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>

当我使用 maven camel:run 启动它时,它工作正常。该交换可用,并且也在 RabbitMQ 管理中排队。当我尝试向该 Exchange 发送消息时出现问题:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.20");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare("KipcastDirect", "direct", 
       true,    /* durable */
       true,    /* autodelete */
       null);   /* */

byte[] messageBodyBytes = "Hello, world!".getBytes();

AMQP.BasicProperties.Builder bob = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = bob.build();
minBasic = bob.priority(0).messageId("Test").build(); 
minBasic = bob.priority(0).deliveryMode(1).build(); 

while (true) {

    channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
    System.out.println(" [x] Sent ");

}

消息已正确发送到队列(我可以在日志中看到它们),但引发异常并且路由停止:

[     SimpleAsyncTaskExecutor-1] SpringAMQPConsumer             WARN  Caused by: [org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException - Listener threw exception]
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at $Proxy46.invokeListener(Unknown Source)[:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]
[     SimpleAsyncTaskExecutor-1] erationsInterceptorFactoryBean WARN  Message dropped on recovery: (Body:'Hello, world!'; ID:Test; Content:text/plain; Headers:{}; Exchange:KipcastDirect; RoutingKey:KipcastRouting; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:2)
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_37]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_37]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at $Proxy46.invokeListener(Unknown Source)[:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]

这有什么问题吗?我必须生成消息 ID 的原因是什么?

最佳答案

问题与 messageConverter bean 有关:

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>

已被替换为

<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
    <property name="converters">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter"/>
            <entry key="application/xml" value-ref="textMessageConverter"/>
        </map>
    </property>
    <property name="fallbackConverter" ref="textMessageConverter"/>
</bean>

这解决了问题并且消息被正确路由。

关于java - RabbitMQ 和 Camel : route interrupted due to "Message dropped on recovery",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14279365/

相关文章:

Java:从 JTextField 获取用户输入

Java:单个代码点的 UTF-8 字节长度(再次代理)

redis - 支持 Redis Pub/Sub 上的竞争消费者吗?

java - 如何检查selenium中存储变量的值?

java - 使用 Set 从 arrayList 中删除重复列表

c - 如何检查消息是否传递到任何单个队列

rabbitmq - 如何在 gitlab-ci.yml 文件中使用 Rabbit?

c# - 有什么方法可以访问公共(public)交通传奇中已发布事件的标题?

.net - 根据 SQL 表时间戳安排 .exe 执行

message-queue - 像 AMQP 这样的面向消息的中间件在哪些领域有用?