我正在开发长期运行的流程,该流程应该找到系统中所有用户都已注册关注的 RSS 提要,解析这些 RSS 提要,提取新条目并将其作为 Hibernate 实体存储回数据库,以便用户可以检索它.我想使用 Apache Spark 来启用并行处理,因为这个过程可能需要几个小时,具体取决于用户数量。
我认为应该可行的方法是使用 useridsRDD.foreachPartition,这样我就可以为每个分区设置单独的 hibernate session 。我创建了为每个分区初始化的数据库 session 管理器,它使 hibernate session 保持 Activity 状态,直到进程结束。
一旦解析了来自一个来源的所有 RSS 提要并创建了提要实体,我就会将整个列表发送到数据库管理器方法,该方法会批量保存整个列表:
public class DiggestGeneratorDAOImpl extends GenericDAOImpl implements
DiggestGeneratorDAO{
public DiggestGeneratorDAOImpl(){
setSession(HibernateUtil.getSessionFactory().openSession());
}
public <T extends BaseEntity> void saveInBatch(List<T> entities) {
try{
boolean isActive = session.getTransaction().isActive();
if ( !isActive) {
session.beginTransaction();
}
for(Object entity:entities){
session.save(entity);
}
session.getTransaction().commit();
}catch(Exception ex){
if(session.getTransaction()!=null) {
session.getTransaction().rollback();
ex.printStackTrace();
}
}
但是,这仅在我有一个 Spark 分区时有效。如果有两个或更多分区,一旦我尝试保存第一个实体,整个过程就会被阻止。为了使事情更简单,我尝试简化 Feed 实体,因此它不引用也不从任何其他实体引用。它也没有任何 Collection 。
更新:
object DigestManager {
private def createDailyUserSubscribedRSSFeedDigests(date:Date, usersRDD:RDD[Long]){
usersRDD.foreachPartition {
users => generateDailySubscribedRSSFeedsDigestForUsersInPartition(users,date)
}
}
private def generateDailySubscribedRSSFeedsDigestForUsersInPartition(users:Iterator[Long],date:Date){
val feedsAgregator:FeedsAgregator =new FeedsAgregatorImpl
users.foreach { userid => feedsAgregator.generateDailySubscribedRSSFeedsDigestForUser(userid, date)}
}
/**End of DigestManager**/
public class FeedsAgregatorImpl implements FeedsAgregator {
private static Logger logger = Logger.getLogger(FeedsAgregatorImpl.class);
private DiggestGeneratorDAO diggestGeneratorDAO=new DiggestGeneratorDAOImpl();
private ResourceTokenizer resourceTokenizer=new ResourceTokenizerImpl();
public void generateDailySubscribedRSSFeedsDigestForUser(Long userid, Date dateFrom) {
User user=null;
try{
user=(User) diggestGeneratorDAO.load(User.class, userid);
}catch(ResourceCouldNotBeLoadedException ex){
ex.printStackTrace();
return;
}
String userTokenizedString = resourceTokenizer.getTokenizedStringForUser(user);
List<FeedSource> subscribedRssSources = diggestGeneratorDAO.getFeedsPreferences(userid).getSubscribedRssSources();
for (FeedSource feedSource : subscribedRssSources) {
List<FeedEntry> entries=parseRSSFeed(null, user, feedSource, userTokenizedString);
if(entries.size()>0){
diggestGeneratorDAO.saveInBatch(entries);
}
}
private List<FeedEntry> parseRSSFeed(User blogOwner, User subscribedUser, FeedSource feedSource, String userTokenizedString) {
String link = feedSource.getLink();
List<FeedEntry> feedEntries=new ArrayList<FeedEntry>();
FeedData feedData = feedParser.readFeed(link, feedSource.getLastCheck());
if (feedData != null && !feedData.getEntries().isEmpty()) {
for (FeedMessageData feedMessageData : feedData.getEntries()) {
FeedEntry feedEntry = new FeedEntry();
feedEntry.setDateCreated(feedMessageData.getPubDate());
feedEntry.setTitle(feedMessageData.getTitle());
feedEntry.setDescription(feedMessageData.getDescription());
feedEntry.setLink(feedMessageData.getLink());
feedEntry.setImage(feedMessageData.getThumbnail());
// feedEntry.setFeedSource(feedSource);
//feedEntry.setMaker(blogOwner);
// feedEntry.setSubscribedUser(subscribedUser);
float relevance = (float) 0.0;
if (userTokenizedString != null && !userTokenizedString.equals("")) {
relevance = webPageRelevance.calculateWebPageRelevanceForUser(link, userTokenizedString);
feedEntry.setRelevance(relevance);
}
feedEntries.add(feedEntry);
}
return feedEntries;
}
更新 2:
我尝试重新设计源代码,它的简化版本看起来像这样:
usersSparkRDD.forearchPartition{
users=>{
val genericDao=new genericDAO
val session:Session=HibernateUtil.getSessionFactory().openSession()
users.foreach{
//tried this also
//val genericDao=new genericDAO
//val session:Session=HibernateUtil.getSessionFactory().openSession()
tags=genericDao.loadSomeDataUsingThisSession(session)
for(tag <- tags){
val post=createNewEntity(tag)
genericDao.saveNewEntityUsingTheSameSession(post,session)
}
//session.close()
}
session.close()
}
}
}
在这种方法中,session 在 DAO 类之外创建,并且相同的 session 在外部传递给每个 DAO 方法。同样,如果我只有 1 个 Spark 分区,它就可以工作,但如果有更多分区,它就不起作用。在创建 session 和关闭 session 的代码中没有任何线程。
更新 3
根据 @Vlad Mihelcea 的建议我调查了线程转储,发现在尝试保存实体时有 2 个线程被阻塞。我还从中排除了 Apache-Spark,只是为了确保它没有问题,所以我创建了新线程,它打开新 session 并尝试在其中保存实体。
"Thread-20" #72 daemon prio=5 os_prio=0 tid=0x00007f9098030000 nid=0x57a5 in Object.wait() [0x00007f9094c41000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
- locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-19" #71 daemon prio=5 os_prio=0 tid=0x00007f909802e800 nid=0x57a4 in Object.wait() [0x00007f9094d42000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
- locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-18" #70 daemon prio=5 os_prio=0 tid=0x00007f909802c800 nid=0x57a3 in Object.wait() [0x00007f9094e43000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
- locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-17" #69 daemon prio=5 os_prio=0 tid=0x00007f909802b000 nid=0x57a2 waiting for monitor entry [0x00007f9094f44000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:147)
- waiting to lock <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-16" #68 daemon prio=5 os_prio=0 tid=0x00007f9098029000 nid=0x57a1 waiting for monitor entry [0x00007f9095045000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:147)
- waiting to lock <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-15" #67 daemon prio=5 os_prio=0 tid=0x00007f9098027800 nid=0x57a0 in Object.wait() [0x00007f9095145000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
- locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
at org.hibernate.engine.transaction.internal.jdbc.JdbcIsolationDelegate.delegateWork(JdbcIsolationDelegate.java:65)
at org.hibernate.id.MultipleHiLoPerTableGenerator$2.getNextValue(MultipleHiLoPerTableGenerator.java:221)
at org.hibernate.id.enhanced.LegacyHiLoAlgorithmOptimizer.generate(LegacyHiLoAlgorithmOptimizer.java:77)
- locked <0x00000000f2b1a330> (a org.hibernate.id.enhanced.LegacyHiLoAlgorithmOptimizer)
at org.hibernate.id.MultipleHiLoPerTableGenerator.generate(MultipleHiLoPerTableGenerator.java:218)
- locked <0x00000000f2b19fd8> (a org.hibernate.id.MultipleHiLoPerTableGenerator)
at org.hibernate.event.internal.AbstractSaveEventListener.saveWithGeneratedId(AbstractSaveEventListener.java:118)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(DefaultSaveOrUpdateEventListener.java:209)
at org.hibernate.event.internal.DefaultSaveEventListener.saveWithGeneratedOrRequestedId(DefaultSaveEventListener.java:55)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsTransient(DefaultSaveOrUpdateEventListener.java:194)
at org.hibernate.event.internal.DefaultSaveEventListener.performSaveOrUpdate(DefaultSaveEventListener.java:49)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:90)
at org.hibernate.internal.SessionImpl.fireSave(SessionImpl.java:715)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:707)
at org.hibernate.internal.SessionImpl.save(SessionImpl.java:702)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:37)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"Thread-14" #66 daemon prio=5 os_prio=0 tid=0x00007f9098026800 nid=0x579f in Object.wait() [0x00007f9095247000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at com.mchange.v2.resourcepool.BasicResourcePool.awaitAvailable(BasicResourcePool.java:1315)
at com.mchange.v2.resourcepool.BasicResourcePool.prelimCheckoutResource(BasicResourcePool.java:557)
- locked <0x00000000f20c30d0> (a com.mchange.v2.resourcepool.BasicResourcePool)
at com.mchange.v2.resourcepool.BasicResourcePool.checkoutResource(BasicResourcePool.java:477)
at com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.checkoutPooledConnection(C3P0PooledConnectionPool.java:525)
at com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource.getConnection(AbstractPoolBackedDataSource.java:128)
at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:90)
at org.hibernate.internal.AbstractSessionImpl$NonContextualJdbcConnectionAccess.obtainConnection(AbstractSessionImpl.java:380)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:228)
at org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.getConnection(LogicalConnectionImpl.java:171)
at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.doBegin(JdbcTransaction.java:67)
at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.begin(AbstractTransactionImpl.java:162)
at org.hibernate.internal.SessionImpl.beginTransaction(SessionImpl.java:1471)
at org.prosolo.bigdata.twitter.TestJava8Paralelizm$1.run(TestJava8Paralelizm.java:31)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
我的 Hibernate 配置有什么问题吗?
public class HibernateUtil {
private static SessionFactory sessionFactory;
public static SessionFactory getSessionFactory() {
if (sessionFactory == null) {
Config config=CommonSettings.getInstance().config;
String host = config.mysqlConfig.host;
int port = config.mysqlConfig.port;
String database = config.mysqlConfig.database;
String user = config.mysqlConfig.user;
String password = config.mysqlConfig.password;
// loads configuration and mappings
Configuration configuration = new Configuration().configure();
configuration.setNamingStrategy(ImprovedNamingStrategy.INSTANCE);
configuration.setProperty("hibernate.dialect","org.hibernate.dialect.MySQL5InnoDBDialect");
configuration.setProperty("hibernate.show_sql", true);
configuration.setProperty("hibernate.max_fetch_depth", 0);
configuration.setProperty("hibernate.hbm2ddl.auto", update);
configuration.setProperty("hibernate.jdbc.batch_size",50);
configuration.setProperty("hibernate.connection.pool_size", 200);
configuration.setProperty("hibernate.connection.charSet", "UTF-8");
configuration.setProperty("hibernate.connection.characterEncoding","UTF-8");
configuration.setProperty("hibernate.connection.useUnicode", true);
configuration.setProperty("hibernate.connection.autocommit", false);
configuration.setProperty("hibernate.cache.use_second_level_cache", true);
configuration.setProperty("hibernate.cache.use_query_cache", true);
configuration.setProperty("hibernate.cache.use_structured_entries", true);
configuration.setProperty("hibernate.cache.region.factory_class","org.hibernate.cache.EhCacheRegionFactory");
configuration.setProperty("hibernate.current_session_context_class","thread" );
configuration.setProperty("hibernate.connection.driver_class", config.mysqlConfig.jdbcDriver);
configuration.setProperty("hibernate.connection.url", "jdbc:mysql://"
+ host + ":" + port + "/" + database+"?useUnicode=true&characterEncoding=UTF-8");
configuration.setProperty("hibernate.connection.username", user);
configuration.setProperty("hibernate.connection.password", password);
configuration.setProperty("hibernate.show_sql", "true");
configuration.setProperty("hibernate.hbm2ddl.auto", "validate");
final Reflections reflections = new Reflections("org.prosolo.common.domainmodel");
final Set<Class<?>> classes = reflections.getTypesAnnotatedWith(Entity.class);
for (final Class<?> clazz : classes) {
configuration.addAnnotatedClass(clazz);
}
ServiceRegistry serviceRegistry
= new StandardServiceRegistryBuilder()
.applySettings(configuration.getProperties()).build();
// builds a session factory from the service registry
sessionFactory = configuration.buildSessionFactory(serviceRegistry);
}
return sessionFactory;
}
更新 4(解决方案):
最后发现是Hibernate数据源配置的问题,改成tomcat DataSource解决了。除了之前的配置外,我还添加了以下内容:
StandardServiceRegistryBuilder serviceRegistryBuilder
= new StandardServiceRegistryBuilder();
serviceRegistryBuilder.applySetting(Environment.DATASOURCE, dataSource());
ServiceRegistry serviceRegistry= serviceRegistryBuilder.applySettings(configuration.getProperties()).build();
sessionFactory = configuration.buildSessionFactory(serviceRegistry);
/////
public static DataSource dataSource() {
MySQLConfig mySQLConfig=CommonSettings.getInstance().config.mysqlConfig;
String username = mySQLConfig.user;
String password = mySQLConfig.password;
String host = mySQLConfig.host;
int port = mySQLConfig.port;
String database = mySQLConfig.database;
String url="jdbc:mysql://"+ host + ":" + port + "/" + database;
PoolProperties p = new PoolProperties();
p.setUrl(url+"?useUnicode=true&characterEncoding=UTF-8");
p.setDriverClassName(CommonSettings.getInstance().config.mysqlConfig.jdbcDriver);
p.setUsername(username);
p.setPassword(password);
p.setJmxEnabled(false);
p.setTestWhileIdle(false);
p.setTestOnBorrow(true);
p.setValidationQuery("SELECT 1");
p.setTestOnReturn(false);
p.setValidationInterval(30000);
p.setTimeBetweenEvictionRunsMillis(1000);
p.setMaxActive(100);
p.setInitialSize(10);
p.setMaxWait(10000);
p.setRemoveAbandonedTimeout(60);
p.setMinEvictableIdleTimeMillis(30000);
p.setMinIdle(10);
p.setLogAbandoned(true);
if(CommonSettings.getInstance().config.rabbitMQConfig.distributed){
p.setRemoveAbandoned(false);
}else{
p.setRemoveAbandoned(true);
}
p.setJdbcInterceptors(
"org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"
+ "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;"
+ "org.apache.tomcat.jdbc.pool.interceptor.ResetAbandonedTimer");
org.apache.tomcat.jdbc.pool.DataSource ds = new org.apache.tomcat.jdbc.pool.DataSource();
ds.setPoolProperties(p);
return ds;
}
最佳答案
你并没有真正利用 contextual session在这里:
public DiggestGeneratorDAOImpl(){
setSession(HibernateUtil.getSessionFactory().openSession());
}
session 旨在绑定(bind)到请求或扩展上下文,但在您的用例中,每个 DAO 实例都创建自己的 session 。
如果您使用两个单独的 DAO 实例,例如 DiggestGeneratorDAOImpl
,那么它们将使用一个单独的 session 、一个单独的事务和一个单独的数据库连接。这可能很棘手,因为只有在提交后,其他交易才能看到来自一个交易的机会。
阻塞可能是由两个并发连接/事务之间的死锁引起的。
您需要重构代码,以便业务方法在所有 DAO 调用之间共享相同的 Session/Transaction/Connection。
关于java - 使用 Apache Spark 的 Hibernate 持久化导致进程阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32419959/