我使用 Celery 运行网络蜘蛛来抓取一些数据,之后我需要将此数据保存在数据库中的某个位置(例如 SQLite),但据我所知,我无法在 Celery 工作人员之间共享 SQLAlchemy session 。你怎么解决这个问题?哪种方式比较常见?
目前我正在尝试使用Redis作为数据的中间存储。
@celery.task
def run_spider(spider, task):
# setup worker
logger = logging.getLogger('Spider: %s' % spider.url)
spider.meta.update({'logger': logger, 'task_id': int(task.id)})
# push task data inside worker
spider.meta.update({'task_request': run_spider.request})
spider.run()
task.state = "inactive"
task.resolved = datetime.datetime.now()
db.session.add(task)
db.session.commit()
编辑:实际上我错了,我不需要共享 session ,我需要为每个 celery 进程/任务创建新的数据库连接。
最佳答案
我也曾在大型 celery 应用程序中使用 redis 进行持久化。
我的任务通常如下所示:
@task
def MyTask(sink, *args, **kwargs):
data_store = sharded_redis.ShardedRedis(sink)
key_helper = helpers.KeyHelper()
my_dictionary = do_work()
data_store.hmset(key_helper.key_for_my_hash(), my_dictionary)
sharded_redis
只是通过客户端处理分片键的多个 Redis 分片的抽象。sink
是一个(host, port)
元组列表,用于在确定分片后建立适当的连接。
本质上,您是在每个任务中连接和断开与 Redis 的连接(非常便宜),而不是创建连接池。
使用连接池是可行的,但如果你要真正利用 celery (运行大量并发任务),那么你最好(在我看来)使用这种方法,因为你面临着连接耗尽的风险池,特别是当您在 Redis 中执行任何需要较长时间的操作时(例如将大型数据集读入内存)。
与 Redis 的连接非常便宜,因此应该可以很好地扩展。我们在几个实例上每分钟处理数十万个任务。
关于python - 在 celery 工作人员内部存储数据的常见且明显的方式是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11876782/