sqlalchemy - 在 celery 任务中反射(reflect) SQLAlchemy 元数据?

标签 sqlalchemy celery

为了更好的可测试性和其他原因,最好让 SQLAlchemy 数据库 session 配置为非全局的,如以下问题中所述:

how to setup sqlalchemy session in celery tasks with no global variable (也在 https://github.com/celery/celery/issues/3561 中讨论过)

现在的问题是,如何优雅地处理元数据?如果我的理解是正确的,元数据可以拥有一次,例如:

engine = create_engine(DB_URL, encoding='utf-8', pool_recycle=3600,
                       pool_size=10)
# db_session = get_session()  # this is old global session
meta = MetaData()
meta.reflect(bind=engine)

由于性能原因,反射(reflect)每个任务执行并不好,元数据或多或少是稳定和线程安全的结构(如果我们只读取它)。

但是,元数据有时会发生变化(celery 不是 db 模式的“所有者”),从而导致工作人员出错。

处理 meta 的优雅方式可能是什么?以可测试的方式,再加上仍然能够对底层数据库更改使用react? (使用中的蒸馏器,如果相关)。

我正在考虑使用 alembic 版本更改作为重新反射(reflect)的信号,但不太确定如何使它在 celery 中很好地工作。例如,如果不止一个工作人员会同时感知到变化,则全局 meta可以以非线程安全的方式处理。

如果重要的话,在这种情况下使用 celery 是独立的,没有 web 框架模块/应用程序/celery 应用程序中存在的任何内容。这个问题也被简化了,因为只有 SQLAlchemy Core 正在使用,而不是对象映射器。

最佳答案

这只是部分解决方案,它适用于 SQLAlchemy ORM(但我想类似的东西很容易为 Core 实现)。

要点:

  • 引擎处于模块级别,但配置(访问 URL、参数)来自 os.environ
  • session 在它自己的工厂函数中
  • 在模块级别:BaseModel = automap_base()然后表类使用该 BaseModel 作为父类(super class),通常只有一个参数 - __tablename__ ,但可以在此处添加任意关系、属性(非常类似于正常的 ORM 使用)
  • 在模块级别:BaseModel.prepare(ENGINE, reflect=True)

  • 测试(使用 pytest)在 DB_URL 中注入(inject)环境变量(例如 conftest.py )在模块级别。

    一个重要时刻:database_session总是在任务函数中启动(即调用工厂函数),并显式传播到所有函数中。这种方式允许自然地控制工作单元,通常每个任务一个事务。这也简化了测试,因为所有使用数据库的功能都可以提供假的或真实的(测试)数据库 session 。

    “任务函数”就是上面是一个函数,在函数中调用,被任务修饰——这样任务函数可以在没有任务机器的情况下进行测试。

    这只是部分解决方案,因为不存在重做反射。如果可以暂时停止任务工作人员(并且由于架构更改而数据库无论如何都会遇到停机),因为这些通常是后台任务,所以它不会造成问题。 worker 也可以通过一些外部看门狗重新启动,它可以监控数据库的变化。这可以通过使用 supervisord 或其他方式来控制在前台运行的 celery worker 来实现。

    总而言之,在我解决了上述问题之后,我更加重视“显式胜于隐式”的哲学。所有那些神奇的“应用程序”,无论是在 celery 还是 Flask 中的“请求”,都可能在函数签名中带来微小的缩写,但我宁愿在调用链中传递某种上下文,以提高可测试性和更好的上下文理解和管理。

    关于sqlalchemy - 在 celery 任务中反射(reflect) SQLAlchemy 元数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52570671/

    相关文章:

    python - 安排 Celery 任务以最大限度提高员工生产力的正确方法是什么?

    django - 在 Celery 任务中运行 Scrapy 蜘蛛(django 项目)

    Django 和 Celery - 更改后将代码重新加载到 Celery 中

    python - Flask/SQLAlchemy - 创建具有关系的模型时出错

    python - 模型中的 SQLAlchemy 自定义本地属性

    celery - 是否可以为 celery 的 Canvas 基元使用自定义路由?

    sqlalchemy - 从 Airflow 连接 postgres 数据库时出错

    python - 警告 : Data truncated for column 'src_address' at row 1

    sql - 比较同一组 SQL 中的 2 个时间段

    python - SQLAlchemy 设置默认 nullable=False