error-handling - 关于 Spring 消息驱动 channel 适配器的查询

标签 error-handling spring-jms message-listener durable-subscription

我正在使用 Spring 的消息驱动 channel 适配器。
我的组件正在使用来自 Tibco 主题的消息并发布到 RabbitMQ 主题

所以消息流如下:
Tibco->(订阅者)组件(发布到)-> RabbitMQ

服务激活器如下所示:正如我们所见,有一个输入 channel 和一个输出 channel 。 bean storeAndForwardActivator 将具有业务逻辑(在方法 createIssueOfInterestOratorRecord 内)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

我还有一个 message=driven-channel-adapter。此适配器将在调用服务适配器之前调用。
<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

即特别是容器(如下所示)将保存要使用的主题名称 - 这是 DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

这个设置工作得很好。但是在某些情况下,我的消费者/组件会收到“流氓”消息。即一个空的有效负载或 HashMap 的消息类型(而不是纯 TextMessage) - 当我们得到这个时 - 我观察到的是 - 在 DefaultMessageListener 级别捕获了一个异常(即我没有像我的业务 bean 一样,即 storeAndForwardActivator ),因此我的组件没有发回 ACK - 因为这是一个持久的主题 - 在主题上有一个消息的构建 - 这是不可取的。
无论天气如何,有没有办法让我立即确认消息,在 DefaultMessageListener 级别捕获异常?

还是应该在 DefaultMessageListener 中引入错误处理程序?
处理这个问题的最佳方法是什么,有什么建议吗?

问候
D

更新:

我尝试将 errorHandler 添加到 org.springframework.jms.listener.DefaultMessageListenerContainer
如下所示
<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler 是一个 bean,如下面的 shpwn
<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler 实现 ErrorHandler
 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

我注意到异常被捕获(当订阅者使用流氓消息时)。我一直在循环中看到此日志
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

即由于未提交事务 - 来自持久主题的相同消息被一次又一次地消耗。我的目标是在使用消息后将 ACK 发送回代理(无论天气如何,是否捕获到异常)。

明天我将尝试错误 channel 。

问候
D

最佳答案

添加 error-channel到消息驱动的适配器; ErrorMessage将包含 MessagingException有两个字段的有效载荷; cause (异常(exception))和failedMessage .

如果使用默认 error-channel="errorChannel" ,记录异常。

如果您想做更多的事情,您可以配置自己的错误 channel 并为其添加一些流程。

编辑:

除了您在下面的评论...
payload must not be null不是堆栈跟踪;这是一条信息。

也就是说,payload must not be null看起来像一条 Spring Integration 消息;它可能在消息转换期间被抛出在消息监听器适配器中,在我们到达失败可以转到 error-channel 的地步之前。 ;这样的异常将被抛回容器。

打开 DEBUG 日志并查找此日志条目:

logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");

此外,提供完整的堆栈跟踪。

编辑#2

因此,我通过在自定义 MessageConverter 中强制转换的有效负载为空来重现您的问题.

DMLC 错误处理程序在事务回滚后由容器调用,因此无法停止回滚。

我们可以向适配器添加一个选项来以不同的方式处理此类错误,但这需要一些工作。

同时,解决方法是编写自定义 MessageConverter ;类似于 this Gist 中的那个.

然后,您的服务将不得不处理“收到错误消息”有效负载。

然后,您提供这样的自定义转换器...
<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue" acknowledge="transacted"
        message-converter="converter"
        channel="jmsInChannel" />

<beans:bean id="converter" class="foo.MyMessageConverter" />

关于error-handling - 关于 Spring 消息驱动 channel 适配器的查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28792971/

相关文章:

activemq - 如何捕获 MessageListener 内引发的 ActiveMQ 线程异常

java - 从消息监听器中检索值并在 Main 中打印

java - DefaultMessageListenerContainer 不缩放

java - 如何禁用ActiveMQ中的InactivityMonitor日志?

java.lang.NoClassDefFoundError : Could not initialize class com. ibm.mq.jms.MQQueueConnectionFactory 错误

javascript - 如果 api 服务器已关闭,提醒用户服务器当前不可用的客户端的最佳做法是什么?

c# - 如何在 ShowDialog() 阻塞调用之前注册消息处理程序?

reactjs - 如何在React.js中找到错误(“Unexpected token”)?

php - Codeigniter 和 AJAX MySQL 错误处理

scala:提高这段代码的可读性和风格