spring 集成rabbit-mq json MessagingException

标签 spring spring-integration

我正在使用 Spring Integration 发送通知,作为错误测试用例,我发送格式错误的 JSON( map )并收到 MessagingException,它似乎一直持续下去.. 没有停止.. 我必须杀死应用程序。

所以想知道如何捕获这个,可能是通过errorChannel。代码示例会很有帮助。

我的 Spring 集成配置:

 <!-- channel to connect to disruption exchange -->
    <int-amqp:publish-subscribe-channel id="inputChannel"
                                        connection-factory="connectionFactory"
                                        exchange="notification.exchange"/>

    <int:json-to-object-transformer input-channel="inputChannel"
                                    output-channel="notificationChannel"
                                    type="java.util.Map"/>

    <int:channel id="notificationChannel">
        <int:interceptors>
            <int:wire-tap channel="loggingChannel"/>
        </int:interceptors>
    </int:channel>

    <int:logging-channel-adapter id="loggingChannel" log-full-message="true" logger-name="tapInbound" level="INFO"/>

    <!-- depending on the deviceType route to either apnsChannel or gcmChannel -->
    <int:router ref="notificationTypeRouter" input-channel="notificationChannel"/>

    <!-- apple push notification channel-->
    <int:channel id="apnsChannel"/>

    <!-- service activator to process disruptionNotificationChannel -->
    <int:service-activator input-channel="apnsChannel" ref="apnsPushNotificationService" method="pushNotification"/>

    <!-- google cloud messaging notification channel-->
    <int:channel id="gcmChannel"/>

    <!-- service activator to process disruptionNotificationChannel -->
    <int:service-activator input-channel="gcmChannel" ref="gcmPushNotificationService" method="pushNotification"/>

    <!-- error channel to may be log to file or email or store to db in the future -->
    <int:channel id="errorChannel"/>

    <int:service-activator input-channel="errorChannel" ref="notificationErrorHandler" method="handleFailedNotification"/>

    <!-- Infrastructure -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}"
                               port="${spring.rabbitmq.port}"
                               username="${spring.rabbitmq.username}"
                               password="${spring.rabbitmq.password}"/>

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:fanout-exchange name="notification.exchange"/>

我还有一个错误处理程序:

public class NotificationErrorHandler {

    private final Logger LOG = LoggerFactory.getLogger(NotificationErrorHandler.class);

    public void handleFailedNotification(Message<MessageHandlingException> message) {
        Map<String, Object> map = (Map) message.getPayload();
        Notification notification = Notification.fromMap(map);
        saveToBD(notification);
    }

    private void saveToBD(Notification notification) {
        LOG.error("[Notification-Error-Handler] Couldn't Send Push notification: device='{}', type='{}', pushId='{}', message='{}', uid='{}'",
                new Object[]{notification.getDevice(),
                        notification.getDeviceType(),
                        notification.getDeviceToken(),
                        notification.getBody(),
                        notification.getUid()});
    }
}

这是异常(exception):

Caused by: org.springframework.messaging.MessagingException: Failure occured in AMQP listener while attempting to convert and dispatch Message.; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('}' (code 125)): was expecting double-quote to start field name
 at [Source: [B@7a707c2c; line: 7, column: 2]
    at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:202)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 common frames omitted
Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('}' (code 125)): was expecting double-quote to start field name
 at [Source: [B@7a707c2c; line: 7, column: 2]
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:160)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:142)
    at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:181)
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('}' (code 125)): was expecting double-quote to start field name
 at [Source: [B@7a707c2c; line: 7, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:437)

希望有人能帮忙。

提前致谢 总经理


根据@Gary 的回答所做的更改及其现在的工作:

<!-- Infrastructure -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}"
                               port="${spring.rabbitmq.port}"
                               username="${spring.rabbitmq.username}"
                               password="${spring.rabbitmq.password}"/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

    <rabbit:direct-exchange name="notification.direct">
        <rabbit:bindings>
            <rabbit:binding queue="notification.queue" key="notification.queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:queue id="notification.queue" name="notification.queue"/>

    <int-amqp:inbound-channel-adapter channel="inputChannel"
                                      queue-names="notification.queue"
                                      connection-factory="connectionFactory"
                                      error-channel="errorChannel"/>

    <int:json-to-object-transformer input-channel="inputChannel"
                                    output-channel="notificationChannel"
                                    type="java.util.Map"/>

    <int:channel id="notificationChannel">
        <int:interceptors>
            <int:wire-tap channel="loggingChannel"/>
        </int:interceptors>
    </int:channel>

    <int:logging-channel-adapter id="loggingChannel" log-full-message="true" logger-name="tapInbound" level="INFO"/>

    <!-- depending on the deviceType route to either apnsChannel or gcmChannel -->
    <int:router ref="notificationTypeRouter" input-channel="notificationChannel"/>

    <!-- apple push notification channel-->
    <int:channel id="apnsChannel"/>

    <!-- service activator to process disruptionNotificationChannel -->
    <int:service-activator input-channel="apnsChannel" ref="apnsPushNotificationService" method="pushNotification"/>

    <!-- google cloud messaging notification channel-->
    <int:channel id="gcmChannel"/>

    <!-- service activator to process disruptionNotificationChannel -->
    <int:service-activator input-channel="gcmChannel" ref="gcmPushNotificationService" method="pushNotification"/>

    <!-- no op channel where message is logged for unknown devices -->
    <int:channel id="noOpChannel"/>

    <!-- service activator to process disruptionNotificationChannel -->
    <int:service-activator input-channel="noOpChannel" ref="noOpPushNotificationService" method="pushNotification"/>

    <!-- error channel to may be log to file or email or store to db in the future -->
    <int:channel id="errorChannel"/>

    <int:service-activator input-channel="errorChannel" ref="notificationErrorHandler"/>

最佳答案

为什么要从发布-订阅 channel 开始流程?使用 pub/sub channel 进行消息分发是不正常的。

如果您可以使用消息驱动的 channel 适配器,则可以添加错误 channel 。

您无法将错误 channel 添加到发布-订阅 channel 。但是,您可以注入(inject)一个错误处理程序(实现org.springframework.util.ErrorHandler)并在检测到 fatal error 时抛出AmqpRejectAndDontRequeueException

您还可以在 channel 中使用 Json MessageConverter,而不是在流下游使用 Json 转换器;在这种情况下,默认错误处理程序将检测消息转换异常并拒绝消息而不是重新排队。

关于spring 集成rabbit-mq json MessagingException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32763433/

相关文章:

json - Spring @ResponseBody Jackson JsonSerializer 与 JodaTime

java - SPRING-DATA-JPA 简单审计不起作用

java - namedParameterJdbctemplate 更新,包含多个要更新的字段

java - 命名类的 NoUniqueBeanDefinitionException

java - 如何使用 Spring Integration 处理来自 AWS SQS FiFo 队列的超过 10 个并发消息

java - 使用 Spring REST 时 URL 中出现错误消息

java - 如何通过 xml 集成创建 bean

java - 如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter

spring-integration - 如何在Spring集成网关上使用AOP?

由于单向 MessageHandler,spring-integration 并行拆分路由聚合流失败