为了更好的可测试性和其他原因,最好让 SQLAlchemy 数据库 session 配置为非全局的,如以下问题中所述:
how to setup sqlalchemy session in celery tasks with no global variable (也在 https://github.com/celery/celery/issues/3561 中讨论过)
现在的问题是,如何优雅地处理元数据?如果我的理解是正确的,元数据可以拥有一次,例如:
engine = create_engine(DB_URL, encoding='utf-8', pool_recycle=3600,
pool_size=10)
# db_session = get_session() # this is old global session
meta = MetaData()
meta.reflect(bind=engine)
由于性能原因,反射(reflect)每个任务执行并不好,元数据或多或少是稳定和线程安全的结构(如果我们只读取它)。
但是,元数据有时会发生变化(celery 不是 db 模式的“所有者”),从而导致工作人员出错。
处理
meta
的优雅方式可能是什么?以可测试的方式,再加上仍然能够对底层数据库更改使用react? (使用中的蒸馏器,如果相关)。我正在考虑使用 alembic 版本更改作为重新反射(reflect)的信号,但不太确定如何使它在 celery 中很好地工作。例如,如果不止一个工作人员会同时感知到变化,则全局
meta
可以以非线程安全的方式处理。如果重要的话,在这种情况下使用 celery 是独立的,没有 web 框架模块/应用程序/celery 应用程序中存在的任何内容。这个问题也被简化了,因为只有 SQLAlchemy Core 正在使用,而不是对象映射器。
最佳答案
这只是部分解决方案,它适用于 SQLAlchemy ORM(但我想类似的东西很容易为 Core 实现)。
要点:
os.environ
BaseModel = automap_base()
然后表类使用该 BaseModel 作为父类(super class),通常只有一个参数 - __tablename__
,但可以在此处添加任意关系、属性(非常类似于正常的 ORM 使用)BaseModel.prepare(ENGINE, reflect=True)
测试(使用 pytest)在
DB_URL
中注入(inject)环境变量(例如 conftest.py
)在模块级别。一个重要时刻:
database_session
总是在任务函数中启动(即调用工厂函数),并显式传播到所有函数中。这种方式允许自然地控制工作单元,通常每个任务一个事务。这也简化了测试,因为所有使用数据库的功能都可以提供假的或真实的(测试)数据库 session 。“任务函数”就是上面是一个函数,在函数中调用,被任务修饰——这样任务函数可以在没有任务机器的情况下进行测试。
这只是部分解决方案,因为不存在重做反射。如果可以暂时停止任务工作人员(并且由于架构更改而数据库无论如何都会遇到停机),因为这些通常是后台任务,所以它不会造成问题。 worker 也可以通过一些外部看门狗重新启动,它可以监控数据库的变化。这可以通过使用 supervisord 或其他方式来控制在前台运行的 celery worker 来实现。
总而言之,在我解决了上述问题之后,我更加重视“显式胜于隐式”的哲学。所有那些神奇的“应用程序”,无论是在 celery 还是 Flask 中的“请求”,都可能在函数签名中带来微小的缩写,但我宁愿在调用链中传递某种上下文,以提高可测试性和更好的上下文理解和管理。
关于sqlalchemy - 在 celery 任务中反射(reflect) SQLAlchemy 元数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52570671/