java - AMQP Spring Integration 错误处理

标签 java spring spring-integration amqp spring-amqp

我有一个如下所示的集成流程:

@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
                                 @Qualifier("writeChannel") MessageChannel messageChannel,
                                 @Qualifier("parseErrorChannel") MessageChannel errorChannel) {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory, auditQueue)
                    .errorChannel(errorChannel)
                    .concurrentConsumers(numConsumers)
                    .messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
            .enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
                    o -> getNamespace(o.getPayload())))
            .channel(messageChannel)
            .get();
}

如果引入格式错误的消息,消息转换器当然会出错,引发 MessageConversionException。在这种情况下,我当然不希望消息重新排队——但我也不想将默认设置为不重新排队被拒绝的消息,就像我在 AmqpInboundAdapterSpec 上所做的那样。什么是正确的方法来避免重新排队以这种方式出错的消息(并以其他方式重新发布它们以进行调试)?

更一般地说,同一流中的下游进程可能会因语义格式更错误的数据而出错——再一次,我不想对它们重新排队。我可以在那些时候抛出 AmqpRejectAndDontRequeueException,但随后我失去了关注点分离,这只是重点的一半。处理这些异常的正确方法是什么——也许有一种方法可以转换为 AmqpRejectAndDontRequeueException

最佳答案

入站适配器的 SimpleMessageListenerContainer 中的默认错误处理程序(一个 ConditionalRejectingErrorHandler)就是这样做的(它检测到一个 MessageConversionException 并抛出一个 AmqpRejectAndDontRequeueException).

您可以通过注入(inject)自己的 FatalExceptionStrategy 来自定义 ConditionalRejectingErrorHandler 以查找其他异常类型并以相同的方式处理它们。

DefaultExceptionStrategy 看起来像这样......

@Override
public boolean isFatal(Throwable t) {
    if (t instanceof ListenerExecutionFailedException
            && t.getCause() instanceof MessageConversionException) {
        if (logger.isWarnEnabled()) {
            logger.warn("Fatal message conversion error; message rejected; "
                    + "it will be dropped or routed to a dead letter exchange, if so configured: "
                    + ((ListenerExecutionFailedException) t).getFailedMessage(), t);
        }
        return true;
    }
    return false;
}

只有在原因链中还没有 AmqpRejectAndDontRequeueException 时才会调用它。

关于java - AMQP Spring Integration 错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32767116/

相关文章:

java - 用逗号分隔 HashMap 输出到 .csv

java - Spring Integration 多个 UDP 入站/出站 channel

java - Spring集成TCP服务器: Get the Ip address & the payload

java - 为什么maven没有将某些jar添加到eclipse中的.classpath文件中?

java - 设置在另一个类中分配的私有(private)变量的问题

java - 黑莓编程——定时器类

database - Spring - PersistenceContext - 没有可用的事务性 EntityManager

java - 欢迎文件列表在 jetty + Spring 中不起作用

java - 创建类路径资源中定义的名称为 'sessionRepositoryFilterRegistration' 的 bean 时出错

spring-boot - 使用 Spring boot 和集成 DSL,出现错误 ClassNotFoundException integration.history.TrackableComponent