java - Seam Hibernate 为两个单独的线程提供相同的 EntityManger 实例

标签 java jpa callback ejb-3.0 seam

我是 Java/Hibernate/Seam 开发方式的新手,但我似乎对 Hibernate 和并发线程有一个奇怪的问题。

我有一个应用程序范围的 Seam 组件,它通过 EJB 计时器以设定的时间间隔 (Orchestrator.java) 调用 startProcessingWorkloads 方法执行。

这个方法有一个注入(inject)的 EntityManager,它用来检查数据库中的数据集合,如果找到工作集合,它会创建一个新的异步 Seam 组件 (LoadContoller.java) 并执行 start() Controller 上的方法

LoadController 注入(inject)了 EntityManager 并使用它来执行非常大的事务(大约一小时)

一旦 LoadController 作为单独的线程运行,Orchestrator 仍会以设定的时间间隔作为线程执行,例如

1分钟
Orchestrator - 查找工作集合(未找到)(线程 1)


2分钟
Orchestrator - 查找工作集合(找到一个,启动 LoadController)(线程 1)
LoadController - 开始更新数据库记录(线程 2)


3分钟
Orchestrator - 查找工作集合(未找到)(线程 1)
LoadController - 仍在更新数据库记录(线程 2)

4分钟
Orchestrator - 查找工作集合(未找到)(线程 1)
LoadController - 仍在更新数据库记录(线程 2)


5分钟
Orchestrator - 查找工作集合(未找到)(线程 1)
LoadController - 完成更新数据库记录(线程 2)


6分钟
Orchestrator - 查找工作集合(未找到)(线程 1)
7分钟
Orchestrator - 查找工作集合(未找到)(线程 1)

但是,当 Orchestrator 与 LoadController 同时运行时,我收到间歇性错误(见下文)。

5:10:40,852 WARN [AbstractBatcher] exception clearing maxRows/queryTimeout java.sql.SQLException: Connection is not associated with a managed connection.org.jboss.resource.adapter.jdbc.jdk6.WrappedConnectionJDK6@1fcdb21

在 Orchestrator 完成其 SQl 查询后以及 LoadController 尝试执行其下一个 SQl 查询时抛出此错误。

我做了一些研究,得出的结论是 EntityManager 正在关闭,因此 LoadController 无法使用它。

现在对究竟是什么关闭了连接感到困惑 我对调用每个组件时 Orchestrator 和 LoadController 使用的实体管理器对象进行了一些基本对象转储,我发现就在我收到上述错误之前发生了这种情况.

2010-07-30 15:06:40,804 INFO [processManagement.LoadController] (pool-15-thread-2) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

2010-07-30 15:10:40,758 INFO [processManagement.Orchestrator] (pool-15-thread-1) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

似乎在 Orchestrator 执行间隔期间,它获得了对 LoadController 当前正在使用的同一 EntityManager 的引用。当 Orchestrator 完成其 SQL 执行时,它会关闭连接并且 LoadController 无法再执行其更新。

所以我的问题是,有没有人知道这件事的发生,或者我的线程都被这段代码弄乱了?

根据我的理解,在注入(inject) EntityManager 时,会从 EntityManagerFactory 注入(inject)一个新实例,该实例将保留在该特定对象中,直到对象离开范围(在这种情况下,它们是无状态的,因此当 start() 方法结束时), 如何将实体管理器的同一个实例注入(inject)到两个单独的线程中?

Orchestrator.java

@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate 
public class Orchestrator {

  //___________________________________________________________

  @Logger Log log;

  @In EntityManager entityManager;

  @In LoadController loadController;

  @In WorkloadManager workloadManager;

  //___________________________________________________________

  private int fProcessInstanceCount = 0;

  //___________________________________________________________

  public Orchestrator() {}

  //___________________________________________________________

  synchronized private void incrementProcessInstanceCount() {
    fProcessInstanceCount++;
  }

  //___________________________________________________________

  synchronized private void decreaseProcessInstanceCount() {
    fProcessInstanceCount--;
  }

  //___________________________________________________________

  @Observer("controllerExceptionEvent") 
  synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
    decreaseProcessInstanceCount();

    log.info(
      "Controller " + String.valueOf(aProcess) + 
      " failed with the error [" + aException.getMessage() + "]"
    );

    Events.instance().raiseEvent(
      Application.ApplicationEvent.applicationExceptionEvent.name(), 
      aException,
      Orchestrator.class
    );
  }

  //___________________________________________________________

  @Observer("controllerCompleteEvent") 
  synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
    try {
      MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
      workloadManager.completeWorkload(completedWorklaod);
    } catch (Exception ex) {
      log.error(ex.getMessage(), ex);
    }

    decreaseProcessInstanceCount();

    log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
  }

  //___________________________________________________________

  @Asynchronous
  public void startProcessingWorkloads(@IntervalDuration long interval) {
    log.info("Polling for workloads.");
    log.info(entityManager.toString());
    try {
      MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();

      if (pendingWorkload != null) {
        log.info(
          "Pending Workload found (Workload_Id = " + 
          String.valueOf(pendingWorkload.getWorkloadId()) + 
          "), starting process controller."
        );

        Process aProcess = pendingWorkload.retriveProcessIdAsProcess();

        ControllerIntf controller = createWorkloadController(aProcess);          

        if (controller != null) {
          controller.start(aProcess, pendingWorkload.getWorkloadId());
          workloadManager.setWorkloadProcessing(pendingWorkload);
        }
      }

    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.applicationExceptionEvent.name(), 
        ex,
        Orchestrator.class
      );
    }

    log.info("Polling complete.");
  }

  //___________________________________________________________  

  private ControllerIntf createWorkloadController(Process aProcess) {
    ControllerIntf newController = null;

    switch(aProcess) {
      case LOAD:
        newController = loadController;
        break;

      default:
        log.info(
          "createWorkloadController() does not know the value (" +
          aProcess.name() + 
          ") no controller will be started."
        );
    }

    // If a new controller is created than increase the 
    // count of started controllers so that we know how
    // many are running.
    if (newController != null) {
      incrementProcessInstanceCount();
    }

    return newController;
  }

  //___________________________________________________________

}

LoadController.java

@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
  //__________________________________________________

  @Logger private Log log;

  @In private EntityManager entityManager; 

  //__________________________________________________

  private String fFileName = "";
  private String fNMDSFileName = "";
  private String fAddtFileName = "";

  //__________________________________________________

  public LoadController(){  }
  //__________________________________________________

  @Asynchronous 
  synchronized public void start(Process aProcess, long aWorkloadId) {
    log.info(
      LoadController.class.getName() + 
      " process thread was started for WorkloadId [" + 
      String.valueOf(aWorkloadId) + "]."
    );
    log.info(entityManager.toString());
    try {
      Query aQuery = entityManager.createQuery(
        "from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
      );

      MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();

      fFileName = 
        misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
        misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
        DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
      ;

      fNMDSFileName = "NMDS_" + fFileName;
      fAddtFileName = "Addt_" + fFileName;

      createDataFile(misLoadRecord.getFileContents());

      ArrayList<String>sasCode = generateSASCode(
        misLoadRecord.getLoadId(),
        misLoadRecord.getMdSourceSystem().getPreloadFile()
      );

      //TODO: As the sas password will be encrypted in the database, we will
      //      need to decrypt it before passing to the below function
      executeLoadSASCode(
        sasCode, 
        misLoadRecord.getInitiatedBy(), 
        misLoadRecord.getSasPassword()
      );

      createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());

      //TODO: Needs to remove password from DB when complete
      removeTempCSVFiles();

      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerCompleteEvent.name(), 
        aProcess, 
        aWorkloadId
      );

      log.info(LoadController.class.getName() + " process thread completed.");
    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerExceptionEvent.name(),
        aProcess, 
        ex
      );
    }
  }
  //__________________________________________________

  private void createDataFile(byte[] aFileContent) throws Exception {
    File dataFile = 
      new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);

    FileUtils.writeBytesToFile(dataFile, aFileContent, true);
  }

  //__________________________________________________

  private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    ArrayList<String> sasCode = new ArrayList<String>();

    sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
    sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
    sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");    
    sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");

    sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
    sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
    sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
    sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
    sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
    sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");

    sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
    sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
    sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
    sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
    sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");

    return sasCode;
  }

  //__________________________________________________

  private void executeLoadSASCode(
    ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception 
  {
    SASExecutor aSASExecutor = new SASExecutor(
      ECEConfig.getConfiguration().sas_server(),
      ECEConfig.getConfiguration().sas_port(),
      aUserName, 
      aPassword
    );

    aSASExecutor.execute(aSasCode);

    log.info(aSASExecutor.getCompleteSasLog());
  }
  //__________________________________________________

  /**
   * Creates the MIS_UR_Workload_Contents records for 
   * the ECE Unit Record data that was just loaded
   * 
   * @param aWorkloadId
   * @param aMisLoadId
   * @throws Exception
   */

  private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {

    String selectionRule = 
      " from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " + 
      String.valueOf(aMisLoadId)
    ;
    MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
    SeamManualTransaction manualTx = new SeamManualTransaction(
      entityManager, 
      ECEConfig.getConfiguration().manualSeamTxTimeLimit()
    );
    manualTx.begin();
    RecordPager oPager = new RecordPager(
      entityManager, 
      selectionRule, 
      ECEConfig.getConfiguration().recordPagerDefaultPageSize()
    );

    Object nextRecord = null;

    while ((nextRecord = oPager.getNextRecord()) != null) {
      EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;

      MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();

      aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
      aContentsRecord.setMisWorkload(misWorkload);
      aContentsRecord.setProcessOutcome('C');

      entityManager.persist(aContentsRecord);
    }

    manualTx.commit();
  }

  /**
   * Removes the CSV temp files that are created for input 
   * into the SAS server and that are created as output.  
   */

  private void removeTempCSVFiles() {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
    File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
    File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);

    if (dataInputCSV.exists()) {
      dataInputCSV.delete();
    }
    if (nmdsOutputCSV.exists()) {
      nmdsOutputCSV.delete();
    }

    if (addtOutputCSV.exists()) {
      addtOutputCSV.delete();
    }
  }
}

SeamManualTransaction.java

public class SeamManualTransaction {

  //___________________________________________________________

  private boolean fObjectUsed = false;
  private boolean fJoinExistingTransaction = true;
  private int fTransactionTimeout = 60; // Default: 60 seconds
  private UserTransaction fUserTx;
  private EntityManager fEntityManager;

  //___________________________________________________________

  /**
   * Set the transaction timeout in milliseconds (from minutes)
   * 
   * @param aTimeoutInMins The number of minutes to keep the transaction active
   */

  private void setTransactionTimeout(int aTimeoutInSecs) {
    // 60 * aTimeoutInSecs = Timeout in Seconds
    fTransactionTimeout = 60 * aTimeoutInSecs;
  }

  //___________________________________________________________

  /**
   * Constructor 
   * 
   * @param aEntityManager
   */

  public SeamManualTransaction(EntityManager aEntityManager) {
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   */

  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
    setTransactionTimeout(aTimeoutInSecs);
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   * @param aJoinExistingTransaction
   */
  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
    setTransactionTimeout(aTimeoutInSecs);
    fJoinExistingTransaction = aJoinExistingTransaction;
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Starts the new transaction
   * 
   * @throws Exception
   */

  public void begin() throws Exception {
    if (fObjectUsed) {
      throw new Exception(
        SeamManualTransaction.class.getCanonicalName() + 
        " has been used. Create new instance."
      );
    }

    fUserTx = 
      (UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction"); 

    fUserTx.setTransactionTimeout(fTransactionTimeout);
    fUserTx.begin(); 

    /* If entity manager is created before the transaction 
     * is started (ie. via Injection) then it must join the 
     * transaction 
     */ 
    if (fJoinExistingTransaction) {
      fEntityManager.joinTransaction();
    }
  }

  //___________________________________________________________

  /**
   * Commit the transaction to the database
   * 
   * @throws Exception
   */

  public void commit() throws Exception {
    fObjectUsed = true;
    fUserTx.commit();
  }

//___________________________________________________________________

/** * 回滚交易 * * @throws 异常 */

public void rollback() 抛出异常{ fObjectUsed = true; fUserTx.rollback();

//___________________________________________________________________

最佳答案

一般来说,在 APPLICATION 范围的 Seam 组件中注入(inject)一个 entityManager 是不对的。实体管理器是您在通常比应用程序范围短得多的范围内创建、使用和再次关闭的东西。

通过使用标准 entityManager 注入(inject)选择较小的作用域进行改进,或者如果您需要 APPLICATION 作用域,则注入(inject)一个 EntityManagerFactory,然后自己创建、使用和关闭 entityManager。

查看您的 Seam components.xml 以找到您的 EntityManagerFactory 组件的名称。

关于java - Seam Hibernate 为两个单独的线程提供相同的 EntityManger 实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3369012/

相关文章:

java - Android 应用程序 Google 电子表格表单数据发布

c# - 通过 COM4J 从 .NET 方法返回接口(interface)数组

javascript - React.js 如何将回调传递给子组件?

java - 堆叠新阶段而不是重叠

java - JPA、Eclipselink 和 Guice 的 NullPointerException

java - JPA 中的异常配置到底是什么?

java - 使用hibernate更改mysql中的表名

android - 使用 removeCallbacks

javascript - 当将参数传递给回调函数时与回调的调用者发生冲突时该怎么办

java - 如何逐像素比较两个图像并从图像的脏部分中提取新图像