cities = DBSession.query(City).filter(City.big=='Y').options(joinedload(City.hash)).limit(1)
t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
t0 = time.time()
w = SWorker(threads_no=1, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')
w.work()
print 'finished'
上面的代码使用 select for update 从表中选择一个关键字状态。 它会锁定该行,直到该行被更新。
如您所见,我更新了该行并提交了更改。
kw_status.status = 1
DBSession.commit()
之后,我创建了一个 SWorker 对象,它将任务放入队列中并创建了一个 处理队列的线程数(为简单起见,这里只有一个)。
工作人员完成处理后更新
kw_status.status = 2
DBSession.commit()
此时我得到一个异常
(1205, 'Lock wait timeout exceeded; try restarting transaction') 'UPDATE g_search_keyword_status SET status=%s WHERE g_search_keyword_status.keyword_id = %s' (2, 10000001L)
所以看起来该行被锁定了。但在我开始工作之前,我已经更新了 status 为 1,我已提交更改,因此应解锁该行。
我也使用 scoped_session
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=engine
)
)
最佳答案
问题是延迟加载。注意事项
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
上面的代码并没有将结果加载到内存中,而是仅在实际需要时加载。
在我的 SWorker 中,我循环遍历 keywordStatus.keyword 对象,因此表为每个线程锁定。
当我使用 all() 将结果加载到内存时问题解决了
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1).all()
关于python - mysql 和 sqlalchemy with_lockmode ('update' ) 它是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11261956/