我有一个 Spring 配置的 Web 应用程序,它通过以下监听器接收 JMS 消息:
public class EntityPersister implements MessageListener {
@Resource
private EntityManager entityManager;
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
Object entity = createEntity(textMessage);
entityManager.persist(entity);
entityManager.flush(); //for debugging only
}
}
}
当我在应用程序中执行此监听器时,我从 entityManager.flush()
行收到 NoTransactionException
。
我需要配置什么才能使实体管理器参与已经存在的 JTA 事务?
我已经在上述实现上尝试了@Transactional
,但没有成功。
ActiveMQ 用作 JMS 提供程序。 Spring 配置为:
<bean id="jmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="atomikos-activemq" />
<property name="xaConnectionFactory">
<!-- ActiveMQ wird als JMS Provider genutzt -->
<bean id="activeMQXAConnectionFactory"
class="org.apache.activemq.spring.ActiveMQXAConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
<property name="maxPoolSize" value="2" />
<property name="localTransactionMode" value="false" />
</bean>
<bean id="entityPersister" class="EntityPersister" />
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destinationName" ref="entityDestinationName" />
<property name="messageListener" ref="entityPersister" />
<property name="sessionTransacted" value="true" />
<property name="transactionManager" ref="txManager" />
</bean>
OpenJPA 用作 JPA 提供程序。持久化单位是:
<persistence-unit name="somePU" transaction-type="JTA">
<jta-data-source>managedDataSource</jta-data-source>
<non-jta-data-source>nonManagedDataSource</non-jta-data-source>
<!-- some entity class listed here -->
<properties>
<property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(ForeignKeys=true)" />
</properties>
</persistence-unit>
Spring 配置为:
<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<!-- when close is called, should we force transactions to terminate or
not? -->
<property name="forceShutdown" value="true" />
</bean>
<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="txManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<!-- enable the configuration of transactional behavior based on annotations -->
<tx:annotation-driven transaction-manager="txManager" />
<bean id="entityManagerFactory"
class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
<property name="persistenceUnitName" value="somePU" />
<property name="jpaPropertyMap">
<map>
<entry key="openjpa.ManagedRuntime" value="jndi" />
</map>
</property>
</bean>
<bean id="entityManager" factory-bean="entityManagerFactory"
factory-method="createEntityManager" />
OpenJPA 通过 JNDI 从持久性单元查找 XA 和非 XA 数据源。
最佳答案
如果您使用 setMessageListener()
,则 onMessage
中的消息接收会在单独的线程中异步发生。
entityManager
的注入(inject)发生在不同的线程中。通常事务是线程绑定(bind)的。因此,在消息接收开始之前,注入(inject)线程可能已经完成了其工作,并且其关联的事务已关闭。即使该事务仍然存在,也不会被纳入消息接收线程使用的同一全局事务中。
您可以通过在 onMessage
中显式创建/获取 entityManager
来验证它。这样,所有 XA 事务感知资源都应加入同一全局事务中,因为所有这些资源都是从同一线程打开的。
关于java - 为什么 JMS MessageListener 中使用的实体管理器不参与 JTA 事务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18572434/