java - 在一个全局事务的范围内使用 JTA 同时调用对不同数据源的少量查询

标签 java multithreading spring jta atomikos

我有一个包含 3 个分布式数据源 (com.atomikos.jdbc.AtomikosDataSourceBean) 的应用程序。我使用 Atomikos 事务管理器作为 JTA 实现。每个数据源都适用于 PostgreSQL 数据库。 现在,我相应地对每个数据源调用我的查询,并且一切正常。

我想知道是否可以使用 JTA 并行调用我的查询(多线程,并发)?

我尝试使用 jdbcTemplate (Spring) 在新创建的线程中简单地调用查询。首先,我遇到了一个 Spring 的问题。 Spring 将事务上下文存储在 ThreadLocal 字段中,因此在我的新线程 ( Spring transaction manager and multithreading ) 中没有正确解析它。我已经通过将相同的事务上下文设置到新创建的线程的 ThreadLocal 中解决了这个问题。 但是我在 Atomikos 代码中遇到了同样的问题。它们还将 CompositeTransactionImp 存储在线程范围映射 (BaseTrancationManager#getCurrentTx) 中。但在 Atomikos 的情况下,不可能为新线程设置值。 所以我不能同时执行查询,因为 Atomikos 似乎不支持这种方法。 但我也查看了 JTA 规范并发现了以下内容:“多个线程可能同时与同一全局事务相关联。” (“3.2 TransactionManager 接口(interface)”,http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)

问题:如何使用 JTA(2 阶段提交)在一个全局事务范围内同时调用对不同数据源的两个或多个查询?

tomcat 上下文中的数据源配置:

<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db1"
          uniqueResourceName="jdbc/db1"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db2"
          uniqueResourceName="jdbc/db2"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db3"
          uniqueResourceName="jdbc/db3"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

spring 上下文中的事务管理器配置:

 <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
  init-method="init" destroy-method="close" lazy-init="true">
  <property name="forceShutdown" value="false" />
 </bean>

代码:

    final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);

    // Solving Spring's ThreadLocal issue: saving thread local params
    final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
    final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();

    // Running query in a separate thread.
    final Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
                for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
                    TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
                }
                if (synchronizations != null && !synchronizations.isEmpty()) {
                    TransactionSynchronizationManager.initSynchronization();
                    for (TransactionSynchronization synchronization : synchronizations) {
                        TransactionSynchronizationManager.registerSynchronization(synchronization);
                    }
                }
                TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
                TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);

                // Executing query.
                final String query = "insert into ...";
                NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);

                template.update(query, parameters);
            } catch (final Throwable ex) {
                exceptionHolder.set(ex);
            }
        }
    });
    thread.start();

    // ... same code as above for other dataSources.

    // allThreds.join(); - joining to all threads.

最佳答案

我认为您解决 TransactionSynchronizationManager 必须使用单线程事务这一事实的想法很有趣,但可能很危险。

在 TransactionSynchronizationManager 中,事务性资源存储在 ThreadLocal Map 中,其中键是资源工厂,我想知道当您使用同一资源工厂对多个线程执行此解决方法时会附加什么 - 它可能不会适用于您的情况,因为您有 3 个数据源 -。 (乍一看,我会说您的一个交易资源将被另一个替换,但也许我遗漏了一些东西......)。

无论如何,我认为您可以尝试使用 javax.transaction.TransactionManager.resume()实现您想要做的事情。

想法是直接使用 JTA api,从而绕过单线程 Spring 事务支持。

下面是一些代码来说明我的想法:

@Autowired
JtaTransactionManager txManager;  //from Spring

javax.transaction.TransactionManager jtaTransactionManager;

public void parallelInserts() {
    jtaTransactionManager = txManager.getTransactionManager();  //we are getting the underlying implementation
    jtaTransactionManager.begin();
    final Transaction jtaTransaction  = jtaTransactionManager.getTransaction();
    try {
      Thread t1 = new Thread(){
        @Override
        public void run() {
            try {
                jtaTransactionManager.resume(jtaTransaction);
                //... do the insert
            } catch (InvalidTransactionException e) {
                try {
                    jtaTransaction.setRollbackOnly();
                } catch (SystemException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            } catch (SystemException e) {
                e.printStackTrace();
            }
        }
      };
      t1.start();
      //same with t2 and t3
    } catch (Exception ex) {
        jtaTransactionManager.setRollbackOnly();
        throw ex;
    }
    //join threads and commit
    jtaTransactionManager.commit();
}

我认为这个解决方案可能有效(我必须说我自己没有尝试过)。我现在看到的唯一限制是您不能重新使用线程,因为没有与 resume() 调用对应的部分,并且您第二次调用 resume() 时可能会遇到 IllegalStateException。

关于java - 在一个全局事务的范围内使用 JTA 同时调用对不同数据源的少量查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18847256/

相关文章:

java - IntelliJ GUI Designer Maven 可执行 JAR 导出

java - Spring MVC 和 java 8 的重定向问题

java - 为什么我的 ElGamal 实现不适用于长文本字符串?

java - Map 数据结构上的 Containkey(List<String> key ) 和 get(List<String> key ) 操作

c - 如何区分一个子进程与其他子进程

java - 多线程问题无法等待线程完成

c++ - 将 std::packaged_task 添加到现有线程?

java - 如何使用 gradle bootRun 添加命令行属性?

spring - 如何将 Mule 表达式传递给 spring 属性

java - QueryDsl - 对关联实体的多个字段进行 JPA 查询