我最近经常使用 Airflow,发现一个非常常见的模式是循环某些集合来创建多个任务。与 example_python_operator.py 非常相似dag 在 github 的示例 dags 文件夹中找到。
我的问题与动态构建循环迭代的集合有关。假设您想要为数据库中存储的一组未知客户端中的每一个创建一个任务,并且您计划查询它们作为填充列表的方法。像这样的事情:
first_task = PythonOperator(
task_id='some_upstream_task',
provide_context=True,
python_callable=some_upstream_task,
dag=dag)
clients = my_database_query()
for client in clients:
task = PythonOperator(
task_id='client_' + str(client),
python_callable=some_function,
dag=dag)
task.set_upstream(first_task)
据我所知,这意味着即使您的 dag 仅每周运行一次,您的数据库也会每 30 秒轮询一次以查找这些客户端。即使您从迭代器设置上游运算符并通过 xcoms 返回客户端并用 xcom_pull()
替换 my_database_query()
,您仍然每 30 秒轮询一次 xcoms。这对我来说似乎很浪费,所以我想知道这种类型的 dag 是否有更好的模式?
最佳答案
在您的代码示例中,我们没有看到 DAG 的计划间隔,但我假设您已经安排了它,比如说 @daily
,并且您希望运行数据库查询每天一次。
在 Airflow 中,调度程序会定期解析 DAG(因此称为“每 30 秒”)。所以你的Python代码会导致一个问题。
在你的情况下,我会考虑改变视角:为什么不尝试在 PosgresOperator link 中运行数据库查询然后将其作为 DAG 的一部分?根据该 Operator 的输出(例如,您可以通过 XCOM 或通过对象存储中的文件进行传播),您可以拥有一个下游的 PythonOperator,它不会为一个客户端运行函数,而是为所有客户端运行函数。
关于python - 动态构建集合以在 Airflow dag 中循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49139821/