java - 使用 Apache Spark 的 Hibernate 持久化导致进程阻塞

标签 java hibernate concurrency transactions apache-spark

我正在开发长期运行的流程,该流程应该找到系统中所有用户都已注册关注的 RSS 提要,解析这些 RSS 提要,提取新条目并将其作为 Hibernate 实体存储回数据库,以便用户可以检索它.我想使用 Apache Spark 来启用并行处理,因为这个过程可能需要几个小时,具体取决于用户数量。

我认为应该可行的方法是使用 useridsRDD.foreachPartition,这样我就可以为每个分区设置单独的 hibernate session 。我创建了为每个分区初始化的数据库 session 管理器,它使 hibernate session 保持 Activity 状态,直到进程结束。

一旦解析了来自一个来源的所有 RSS 提要并创建了提要实体,我就会将整个列表发送到数据库管理器方法,该方法会批量保存整个列表:

public class DiggestGeneratorDAOImpl extends GenericDAOImpl implements
   DiggestGeneratorDAO{
    public DiggestGeneratorDAOImpl(){
    setSession(HibernateUtil.getSessionFactory().openSession());
 }     
public  <T extends BaseEntity> void saveInBatch(List<T> entities) {
    try{
      boolean isActive = session.getTransaction().isActive();  
        if ( !isActive) {  
            session.beginTransaction();  
        }  
       for(Object entity:entities){
         session.save(entity);
        }           
       session.getTransaction().commit();
     }catch(Exception ex){
    if(session.getTransaction()!=null) {
        session.getTransaction().rollback();
        ex.printStackTrace();
   }
  }      

但是,这仅在我有一个 Spark 分区时有效。如果有两个或更多分区,一旦我尝试保存第一个实体,整个过程就会被阻止。为了使事情更简单,我尝试简化 Feed 实体,因此它不引用也不从任何其他实体引用。它也没有任何 Collection 。

更新:

 object DigestManager {
private def createDailyUserSubscribedRSSFeedDigests(date:Date, usersRDD:RDD[Long]){
 usersRDD.foreachPartition {       
   users =>  generateDailySubscribedRSSFeedsDigestForUsersInPartition(users,date)       
 }
}

private def generateDailySubscribedRSSFeedsDigestForUsersInPartition(users:Iterator[Long],date:Date){
val feedsAgregator:FeedsAgregator =new FeedsAgregatorImpl
 users.foreach { userid => feedsAgregator.generateDailySubscribedRSSFeedsDigestForUser(userid, date)}
 }
/**End of DigestManager**/

public class FeedsAgregatorImpl implements FeedsAgregator {


private static Logger logger = Logger.getLogger(FeedsAgregatorImpl.class);
private DiggestGeneratorDAO diggestGeneratorDAO=new DiggestGeneratorDAOImpl();
    private ResourceTokenizer resourceTokenizer=new ResourceTokenizerImpl();

public void generateDailySubscribedRSSFeedsDigestForUser(Long userid, Date dateFrom) {
    User user=null;
    try{
        user=(User) diggestGeneratorDAO.load(User.class, userid);
    }catch(ResourceCouldNotBeLoadedException ex){
        ex.printStackTrace();
        return;
    }

    String userTokenizedString = resourceTokenizer.getTokenizedStringForUser(user);
    List<FeedSource> subscribedRssSources = diggestGeneratorDAO.getFeedsPreferences(userid).getSubscribedRssSources();

    for (FeedSource feedSource : subscribedRssSources) {
        List<FeedEntry> entries=parseRSSFeed(null, user, feedSource, userTokenizedString);
        if(entries.size()>0){                
            diggestGeneratorDAO.saveInBatch(entries);
        }        
    }

private List<FeedEntry> parseRSSFeed(User blogOwner, User subscribedUser, FeedSource feedSource, String userTokenizedString) {

    String link = feedSource.getLink();
    List<FeedEntry> feedEntries=new ArrayList<FeedEntry>();
    FeedData feedData = feedParser.readFeed(link, feedSource.getLastCheck());
    if (feedData != null && !feedData.getEntries().isEmpty()) {
        for (FeedMessageData feedMessageData : feedData.getEntries()) {

            FeedEntry feedEntry = new FeedEntry();

            feedEntry.setDateCreated(feedMessageData.getPubDate());
            feedEntry.setTitle(feedMessageData.getTitle());
            feedEntry.setDescription(feedMessageData.getDescription());
            feedEntry.setLink(feedMessageData.getLink());
            feedEntry.setImage(feedMessageData.getThumbnail());
            // feedEntry.setFeedSource(feedSource);
            //feedEntry.setMaker(blogOwner);
            // feedEntry.setSubscribedUser(subscribedUser);

            float relevance = (float) 0.0;              
            if (userTokenizedString != null && !userTokenizedString.equals("")) {
                relevance = webPageRelevance.calculateWebPageRelevanceForUser(link, userTokenizedString);
                feedEntry.setRelevance(relevance);
            }
            feedEntries.add(feedEntry);  

    }
    return feedEntries;
}

更新 2:

我尝试重新设计源代码,它的简化版本看起来像这样:

usersSparkRDD.forearchPartition{
users=>{
     val genericDao=new genericDAO
     val   session:Session=HibernateUtil.getSessionFactory().openSession()
users.foreach{  
             //tried this also
             //val genericDao=new genericDAO
         //val session:Session=HibernateUtil.getSessionFactory().openSession()        
         tags=genericDao.loadSomeDataUsingThisSession(session)
         for(tag <- tags){
            val post=createNewEntity(tag)
            genericDao.saveNewEntityUsingTheSameSession(post,session)
             }
             //session.close()
    }
    session.close()     
}
}
}

在这种方法中,session 在 DAO 类之外创建,并且相同的 session 在外部传递给每个 DAO 方法。同样,如果我只有 1 个 Spark 分区,它就可以工作,但如果有更多分区,它就不起作用。在创建 session 和关闭 session 的代码中没有任何线程。

更新 3

根据 @Vlad Mihelcea 的建议我调查了线程转储,发现在尝试保存实体时有 2 个线程被阻塞。我还从中排除了 Apache-Spark,只是为了确保它没有问题,所以我创建了新线程,它打开新 session 并尝试在其中保存实体。

"Thread-20" #72 daemon prio=5 os_prio=0 tid=0x00007f9098030000 nid=0x57a5 in Object.wait() [0x00007f9094c41000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
    at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
    - locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
    at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
    at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
    at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
    at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
    at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
    at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
    at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-19" #71 daemon prio=5 os_prio=0 tid=0x00007f909802e800 nid=0x57a4 in Object.wait() [0x00007f9094d42000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
    at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
    - locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
    at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
    at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
    at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
    at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
    at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
    at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
    at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-18" #70 daemon prio=5 os_prio=0 tid=0x00007f909802c800 nid=0x57a3 in Object.wait() [0x00007f9094e43000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
    at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
    - locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
    at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
    at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
    at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
    at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
    at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
    at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
    at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-17" #69 daemon prio=5 os_prio=0 tid=0x00007f909802b000 nid=0x57a2 waiting for monitor entry [0x00007f9094f44000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:147)
    - waiting to lock <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
    at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
    at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
    at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
    at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-16" #68 daemon prio=5 os_prio=0 tid=0x00007f9098029000 nid=0x57a1 waiting for monitor entry [0x00007f9095045000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:147)
    - waiting to lock <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
    at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
    at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
    at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
    at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-15" #67 daemon prio=5 os_prio=0 tid=0x00007f9098027800 nid=0x57a0 in Object.wait() [0x00007f9095145000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
    at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
    - locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
    at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
    at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
    at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
    at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
    at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcIsolationDelegate.delegateWork(JdbcIsolationDelegate.java:65)
    at org.hibernate.id.MultipleHiLoPerTableGenerator$2.getNextValue(MultipleHiLoPerTableGenerator.java:221)
    at org.hibernate.id.enhanced.LegacyHiLoAlgorithmOptimizer.generate(LegacyHiLoAlgorithmOptimizer.java:77)
    - locked <0x00000000f2b1a330> (a org.hibernate.id.enhanced.LegacyHiLoAlgorithmOptimizer)
    at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:218)
    - locked <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
    at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
    at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
    at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
    at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
    at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
    at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Thread-14" #66 daemon prio=5 os_prio=0 tid=0x00007f9098026800 nid=0x579f in Object.wait() [0x00007f9095247000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
    at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
    - locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
    at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
    at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
    at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
    at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
    at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
    at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
    at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
    at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
    at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

我的 Hibernate 配置有什么问题吗?

public class HibernateUtil {
private static SessionFactory sessionFactory;

public static SessionFactory getSessionFactory() {
    if (sessionFactory == null) {
        Config config=CommonSettings.getInstance().config;
        String host = config.mysqlConfig.host;
        int port = config.mysqlConfig.port;
        String database = config.mysqlConfig.database;
        String user = config.mysqlConfig.user;
        String password = config.mysqlConfig.password;
        // loads configuration and mappings
        Configuration configuration = new Configuration().configure();
        configuration.setNamingStrategy(ImprovedNamingStrategy.INSTANCE);
        configuration.setProperty("hibernate.dialect","org.hibernate.dialect.MySQL5InnoDBDialect");
        configuration.setProperty("hibernate.show_sql", true);
        configuration.setProperty("hibernate.max_fetch_depth", 0);
        configuration.setProperty("hibernate.hbm2ddl.auto", update);
        configuration.setProperty("hibernate.jdbc.batch_size",50);
        configuration.setProperty("hibernate.connection.pool_size", 200);
        configuration.setProperty("hibernate.connection.charSet", "UTF-8");
        configuration.setProperty("hibernate.connection.characterEncoding","UTF-8");
        configuration.setProperty("hibernate.connection.useUnicode", true);
        configuration.setProperty("hibernate.connection.autocommit", false);
        configuration.setProperty("hibernate.cache.use_second_level_cache", true);
        configuration.setProperty("hibernate.cache.use_query_cache", true);
        configuration.setProperty("hibernate.cache.use_structured_entries", true);
        configuration.setProperty("hibernate.cache.region.factory_class","org.hibernate.cache.EhCacheRegionFactory");
        configuration.setProperty("hibernate.current_session_context_class","thread" );
        configuration.setProperty("hibernate.connection.driver_class", config.mysqlConfig.jdbcDriver);
        configuration.setProperty("hibernate.connection.url", "jdbc:mysql://"
                + host + ":" + port + "/" + database+"?useUnicode=true&characterEncoding=UTF-8");
        configuration.setProperty("hibernate.connection.username", user);
        configuration.setProperty("hibernate.connection.password", password);
        configuration.setProperty("hibernate.show_sql", "true");
        configuration.setProperty("hibernate.hbm2ddl.auto", "validate");

        final Reflections reflections = new Reflections("org.prosolo.common.domainmodel");
        final Set<Class<?>> classes = reflections.getTypesAnnotatedWith(Entity.class);
        for (final Class<?> clazz : classes) {
            configuration.addAnnotatedClass(clazz);
        }
        ServiceRegistry serviceRegistry
            = new StandardServiceRegistryBuilder()
                .applySettings(configuration.getProperties()).build();

        // builds a session factory from the service registry
        sessionFactory = configuration.buildSessionFactory(serviceRegistry);  

    }

    return sessionFactory;
}

更新 4(解决方案):

最后发现是Hibernate数据源配置的问题,改成tomcat DataSource解决了。除了之前的配置外,我还添加了以下内容:

 StandardServiceRegistryBuilder serviceRegistryBuilder
            = new StandardServiceRegistryBuilder();
            serviceRegistryBuilder.applySetting(Environment.DATASOURCE, dataSource());
             ServiceRegistry serviceRegistry=  serviceRegistryBuilder.applySettings(configuration.getProperties()).build();
sessionFactory = configuration.buildSessionFactory(serviceRegistry); 

 /////
 public static DataSource dataSource() {
    MySQLConfig mySQLConfig=CommonSettings.getInstance().config.mysqlConfig;
    String username = mySQLConfig.user;
    String password = mySQLConfig.password;
    String host = mySQLConfig.host;
    int port = mySQLConfig.port;
    String database = mySQLConfig.database;
    String url="jdbc:mysql://"+ host + ":" + port + "/" + database;

    PoolProperties p = new PoolProperties();
    p.setUrl(url+"?useUnicode=true&characterEncoding=UTF-8");
    p.setDriverClassName(CommonSettings.getInstance().config.mysqlConfig.jdbcDriver);
    p.setUsername(username);
    p.setPassword(password);
    p.setJmxEnabled(false);
    p.setTestWhileIdle(false);
    p.setTestOnBorrow(true);
    p.setValidationQuery("SELECT 1");
    p.setTestOnReturn(false);
    p.setValidationInterval(30000);
    p.setTimeBetweenEvictionRunsMillis(1000);
    p.setMaxActive(100);
    p.setInitialSize(10);
    p.setMaxWait(10000);
    p.setRemoveAbandonedTimeout(60);
    p.setMinEvictableIdleTimeMillis(30000);
    p.setMinIdle(10);
    p.setLogAbandoned(true);
    if(CommonSettings.getInstance().config.rabbitMQConfig.distributed){
        p.setRemoveAbandoned(false);
    }else{
        p.setRemoveAbandoned(true);
    }
    p.setJdbcInterceptors(
            "org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"
            + "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;"
            + "org.apache.tomcat.jdbc.pool.interceptor.ResetAbandonedTimer");
         org.apache.tomcat.jdbc.pool.DataSource ds = new org.apache.tomcat.jdbc.pool.DataSource();
         ds.setPoolProperties(p);
         return ds;
 }

最佳答案

你并没有真正利用 contextual session在这里:

public DiggestGeneratorDAOImpl(){
    setSession(HibernateUtil.getSessionFactory().openSession());
}    

session 旨在绑定(bind)到请求或扩展上下文,但在您的用例中,每个 DAO 实例都创建自己的 session 。

如果您使用两个单独的 DAO 实例,例如 DiggestGeneratorDAOImpl,那么它们将使用一个单独的 session 、一个单独的事务和一个单独的数据库连接。这可能很棘手,因为只有在提交后,其他交易才能看到来自一个交易的机会。

阻塞可能是由两个并发连接/事务之间的死锁引起的。

您需要重构代码,以便业务方法在所有 DAO 调用之间共享相同的 Session/Transaction/Connection。

关于java - 使用 Apache Spark 的 Hibernate 持久化导致进程阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32419959/

相关文章:

java - 当数据库中有多个对象时,Spring boot rest 响应在 OneToMany 上给出 1 个结果

hibernate - 如何强制 findAllBy 在 Grails 中返回单个结果的数组?

java - ArrayList 的 ConcurrentModificationException

java - hibernate二级缓存ehcache的性能

java - 从辅助线程在主线程上运行代码?

java - 通过 Java 提交多个 hadoop 作业

Java使用Scanner作为构造函数的参数

java - 所以我刚刚开始在treehouse.com学习java,我已经完成了基础知识,但是当我尝试在intellij idea ide上学到的所有东西时,没有任何效果

java - 使用 MediaRecorder 在 Android 上录制屏幕时无法获取 Surface

java - 如何从 YouTube 视频接收/获取音频/视频数据?