我的应用程序从用户接收一个或多个 URL(通常为 3-4 个 URL),从这些 URL 中抓取某些数据并将这些数据写入数据库。但是,因为抓取这些数据需要一点时间,所以我正在考虑在单独的线程中运行每个抓取,以便抓取+写入数据库可以继续在后台进行,这样用户就不必继续等待。
为了实现这一点,我有(仅相关部分):
@view_config(route_name="add_movie", renderer="templates/add_movie.jinja2")
def add_movie(request):
post_data = request.POST
if "movies" in post_data:
movies = post_data["movies"].split(os.linesep)
for movie_id in movies:
movie_thread = Thread(target=store_movie_details, args=(movie_id,))
movie_thread.start()
return {}
def store_movie_details(movie_id):
movie_details = scrape_data(movie_id)
new_movie = Movie(**movie_details) # Movie is my model.
print new_movie # Works fine.
print DBSession.add(movies(**movie_details)) # Returns None.
虽然 new_movie
行确实打印了正确的报废数据,但 DBSession.add()
不起作用。事实上,它只是返回 None
。
如果我删除线程并仅调用方法store_movie_details()
,它就可以正常工作。
发生什么事了?
最佳答案
首先,SA 文档 Session.add()不要提及任何有关该方法的返回值的内容,因此我假设它应该返回 None
。
其次,我认为您的意思是将 new_movie
添加到 session 中,而不是 movies(**movie_details)
,无论它是什么:)
第三,标准 Pyramid session (使用 ZopeTransactionExtension 配置的 session )与 Pyramid 的请求-响应周期相关联,这可能会在您的情况下产生意外的行为。您需要配置一个单独的 session ,您需要在 store_movie_details
中手动提交该 session 。本次 session 需要使用scoped_session因此 session 对象是线程本地的,并且不跨线程共享。
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker(bind=some_engine)
AsyncSession = scoped_session(session_factory)
def store_movie_details(movie_id):
session = AsyncSession()
movie_details = scrape_data(movie_id)
new_movie = Movie(**movie_details) # Movie is my model.
session.add(new_movie)
session.commit()
当然,这种方法仅适用于非常轻量级的任务,并且如果您不介意偶尔丢失任务(例如,当网络服务器重新启动时)。对于任何更严重的事情,请看看 celery 等,正如 Antoine Leclair 建议的那样。
关于python - Pyramid:多线程数据库操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24705323/