python - 动态构建集合以在 Airflow dag 中循环

标签 python airflow

我最近经常使用 Airflow,发现一个非常常见的模式是循环某些集合来创建多个任务。与 example_python_operator.py 非常相似dag 在 github 的示例 dags 文件夹中找到。

我的问题与动态构建循环迭代的集合有关。假设您想要为数据库中存储的一组未知客户端中的每一个创建一个任务,并且您计划查询它们作为填充列表的方法。像这样的事情:

first_task = PythonOperator(
    task_id='some_upstream_task',
    provide_context=True,
    python_callable=some_upstream_task,
    dag=dag)

clients = my_database_query()

for client in clients:
    task = PythonOperator(
        task_id='client_' + str(client),
        python_callable=some_function,
        dag=dag)

    task.set_upstream(first_task)

据我所知,这意味着即使您的 dag 仅每周运行一次,您的数据库也会每 30 秒轮询一次以查找这些客户端。即使您从迭代器设置上游运算符并通过 xcoms 返回客户端并用 xcom_pull() 替换 my_database_query() ,您仍然每 30 秒轮询一次 xcoms。这对我来说似乎很浪费,所以我想知道这种类型的 dag 是否有更好的模式?

最佳答案

在您的代码示例中,我们没有看到 DAG 的计划间隔,但我假设您已经安排了它,比如说 @daily,并且您希望运行数据库查询每天一次。

在 Airflow 中,调度程序会定期解析 DAG(因此称为“每 30 秒”)。所以你的Python代码会导致一个问题。

在你的情况下,我会考虑改变视角:为什么不尝试在 PosgresOperator link 中运行数据库查询然后将其作为 DAG 的一部分?根据该 Operator 的输出(例如,您可以通过 XCOM 或通过对象存储中的文件进行传播),您可以拥有一个下游的 PythonOperator,它不会为一个客户端运行函数,而是为所有客户端运行函数。

关于python - 动态构建集合以在 Airflow dag 中循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49139821/

相关文章:

Airflow 以编程方式取消暂停 dag?

python - python中3个类之间的继承

python - 使用 Airflow MySqlOperator 时插入查询中的语法错误

python - 在 Python 的 matplotlib 中显示彩色二维数组

python - 条件if语句函数的最佳方法?

python - Airflow、XCom 和多个 task_id

airflow - 以不同的频率运行dags |空 Airflow 动

Airflow Scheduler 不断崩溃,数据库连接错误(Google Composer)

android - 如何解决命令 "python systrace.py --set-tags gfx,view,wm"中的错误?

python - 如何检索先前命令的输出并将其保存在 Python 交互式 shell 中的变量中?