java - JMS 轮询器事务

标签 java spring spring-integration spring-transactions spring-jms

我正在使用 Spring Integration 4.1.5 并尝试对事务执行某些操作,但不幸的是我无法也找不到工作示例。我正在尝试设置一个正在查找消息的 JMS 轮询器。一旦收到消息,服务激活器将在数据库中插入一行,并将该消息传递给另一个服务激活器。我想让前两部分成为事务性的消息拾取和数据库插入。我不希望其余流程是事务性的。我使用 Weblogic 作为应用程序容器,因此将使用 WebLogicJtaTransactionManager。

我遇到的问题是我无法使前两部分成为事务性的。要么全有,要么什么都没有。我尝试了很多方法,但我觉得在轮询器上使用建议链是最好的选择。我将能够控制哪些方法将成为事务的一部分。

我见过使用消息驱动监听器的示例,但我正在使用 Weblogic 并且将使用工作管理器,并且我相信我必须使用轮询器才能利用工作管理器(如果情况并非如此,我想这将是 future 的另一个问题!)

我已经获取了 xml 并对其进行了简化,但是除了编辑包名称之外,上下文还会产生问题。

   <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/file 
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
    http://www.springframework.org/schema/integration/sftp 
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd 
    http://www.springframework.org/schema/integration/xml 
    http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop.xsd">


    <bean id="jtaTransactionManager"
        class="org.springframework.transaction.jta.WebLogicJtaTransactionManager">
        <property name="transactionManagerName" value="javax.transaction.TransactionManager" />
    </bean>

    <bean id="insertMessageToDb" class="com.ReadMsgFromAxway" />
    <bean id="serviceActivator" class="com.CreateTMDFile" />

    <int-jms:inbound-channel-adapter id="jmsDefaultReceiver"
        connection-factory="inboundDefaultAdaptorConnectionFactory"
        extract-payload="false" destination="inboundAdaptorDefaultListenerQueue"
        channel="inboundJMS" acknowledge="transacted">  
        <int:poller id="poller" 
            max-messages-per-poll="100" fixed-rate="10">
            <int:advice-chain>
                <ref bean="txAdvice" />
            </int:advice-chain>
        </int:poller>
    </int-jms:inbound-channel-adapter>


    <tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
        <tx:attributes>
            <tx:method name="processMessage" propagation="REQUIRED"/>
        </tx:attributes>
    </tx:advice>

    <aop:config>
        <aop:pointcut id="txOperation" 
            expression="execution(* axway.ReadMsgFromAxway.processMessage(..))"/>
        <aop:advisor advice-ref="txAdvice" pointcut-ref="txOperation" />
    </aop:config> 

    <int:service-activator input-channel="inboundJMS"
        output-channel="serviceActivatorChannel" ref="insertMessageToDb" method="processMessage" />

    <int:chain input-channel="serviceActivatorChannel" output-channel="nullChannel">
        <int:service-activator ref="serviceActivator" />
    </int:chain>

</beans>

ReadMsgFromAxway.java

    public Message<File> processMessage(Message<?> message)  {
//Insert into DB
        trackerProcess.insertUpdateMessageTracker(message, "axwayChannel",
                "axwayChannel", currentStatusID, null, null);
        count++;
        int mod = count % 2;
        if (mod != 0) {
            // pass every 2
            String hello = "hey";
        } else {
            throw new RuntimeException("Testing transactional");
        }

        Message<File> springMessage = MessageBuilder.createMessage(payloadFile,
                messageHeaders);
        return springMessage;
    }

无论是抛出运行时异常,还是在下一个服务激活器组件处抛出异常,XML 本身都不执行任何操作。

如果我将建议属性更改为

        <tx:method name="*" propagation="REQUIRED"/>

然后第一个和第二个服务激活器处的异常导致回滚。

奇怪的是,如果我这样做

        <tx:method name="processMessage" propagation="REQUIRED"/>
        <tx:method name="*" propagation="NEVER"/>

然后,无论是第一个服务激活器还是第二个服务激活器中抛出运行时异常,消息都会回滚。我认为切入点会限制哪个类会导致事务,但我可能误解了一些东西。

另一个注意事项 - 此应用程序与其他几个 war 一起存放在一个 Ear 文件中。该上下文启动整个入站流程,并通过 JMS 队列连接到另一个包含业务逻辑的 war。在使用 method name="*"的场景中,我看到业务逻辑 war 中的异常导致原始入站消息的 JMS 消息也被回滚。我的印象是第二次 war 将在另一个线程中进行处理,因为它通过队列接收消息,因此不属于事务的一部分。这可能是容器管理的 JTA 的副作用吗?

谢谢!

最佳答案

我建议您阅读 Dave Syer 的 article关于交易,您可以在“更多类似内容”中找到链接。

现在看起来你根本不了解事务和AOP。你应该多关注Spring Framework中的AOP支持.

一般来说,声明性事务(方法上的 @Transactional)是 AOP 建议的特殊情况。任何 AOP 背后的主要概念是由我们指定建议的方法调用产生的调用堆栈边界。

但是如果目标对象中没有这样的方法,则不会将其建议并包装到 AOP 代理。就像您的 processMessage 的情况一样当您应用txAdvice时对于 org.springframework.messaging.Message.MessageSource 周围的一些内部对象作为 JmsDestinationPollingSource 的契约(Contract)为此<int-jms:inbound-channel-adapter> .

您可以使用 <transactional> <poller>的配置。右图:所有下游流量都将被交易覆盖。

在这种情况下完成该内部的交易

    Callable<Boolean> pollingTask = new Callable<Boolean>() {

        @Override
        public Boolean call() throws Exception {
            return doPoll();
        }
    };

大约receive()handleMessage() ,您应该像常规 @Transactional 一样完成方法调用。案件。为此,我们应该将下一个消息处理交给不同的线程。只是因为默认情况下所有 <channel>DirectChannel并与当前调用堆栈绑定(bind)。

为此,您可以使用 ExecutorChannel ( <int:dispatcher task-executor="threadPoolExecutor"/> )为您的 serviceActivatorChannel .

从那里您不需要其余的 AOP 配置。

不确定您针对其他网络应用程序的第二个问题。看起来它有一些逻辑可以在你这边出现一些不一致的情况下回滚它的工作。但无论如何,这看起来像是一个不同的问题。

关于java - JMS 轮询器事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37017917/

相关文章:

java - spring security不允许用户登录,不显示任何错误

java - 我无法使用 spring mvc 和 jsp 访问我的 web 应用程序中的 js 和 css 资源

error-handling - Spring集成中的错误处理

java - 带有外部 Web 服务监控的 Spring Integration 应用程序

spring-integration - spring 集成 sftp java dsl : cannot resolve method handleWithAdapter

java - 无法使用restFB 在我的墙上发帖

java - Postman 中仅显示响应实体

java - 如何使用 jax-ws commons 配置 jax-ws 以与 Spring 一起使用?

java - 使用screencap截屏时出现0字节文件

java - Wicket vs Vaadin