我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。
def test(task_id):
logging.info(f' execution of task {task_id}')
def load_subdag(parent_dag_id, child_dag_id, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
r = DummyOperator(task_id='random')
for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
t = PythonOperator(
task_id='load_subdag_{0}'.format(i),
default_args=args,
python_callable=print_context,
op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
dag=dag_subdag,
)
return dag_subdag
load_tasks = SubDagOperator(
task_id='load_tasks',
subdag=load_subdag(dag.dag_id,
'load_tasks', args),
default_args=args,
)
我的代码出现此错误
1 | Traceback (most recent call last):
airflow_1 | File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1 | m = imp.load_source(mod_name, filepath)
airflow_1 | File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1 | module = _load(spec)
airflow_1 | File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1 | File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1 | File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1 | File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1 | File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1 | 'load_tasks', args),
airflow_1 | File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1 | for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1 | TypeError: xcom_pull() missing 1 required positional argument: 'context'
最佳答案
错误很简单:您缺少 context
方法所需的 xcom_pull()
参数。但是你真的不能只创建 context
来传递给这个方法;它是一个 Python
字典,Airflow
传递给 anchor 方法,如 pre_execute()
的 execute()
and BaseOperator
(所有 Operator
的父类)。
换句话说, context
仅在 Operator
实际执行时才可用 ,而不是在 DAG
-definition 期间。这是有道理的,因为在 Airflow
的分类中, xcom
s 是 task
s 之间的实时通信机制:它们在运行时相互交谈。
但最终 Xcom
s,就像所有其他 Airflow
模型一样,被持久化在后端元数据库中。所以当然你可以直接从那里检索它(显然只有过去运行过的 task
的XCOM)。虽然我没有代码片段,但您可以查看 cli.py
,他们使用 SQLAlchemy
ORM 来处理模型和后端数据库。请理解,这意味着每次解析 DAG
-definition 文件时都会向您的后端数据库触发查询,这发生得相当快。
有用的链接
编辑-1
看了你的代码片段后,我感到震惊。假设
xcom_pull()
返回的值会经常变化,那么 task
中 dag
的数量也会不断变化。这可能会导致 不可预测的行为 (你应该做一些研究,但我对此感觉不太好)我建议你重新审视你的整个任务工作流程并浓缩到一个设计中
-
task
的数量和-
DAG
的结构提前知道(在执行 dag 定义文件时)。您当然可以迭代
json
文件/SQL
查询的结果(如前面提到的 SQLAlchemy
事情)等,以生成您的实际 task
s,但该文件/db/任何不应该经常更改。请理解,仅仅迭代一个列表来生成
task
是没有问题的;不可能的是让 DAG
的结构依赖于 upstream
task
的结果。例如,您不能根据上游任务在运行时计算 n 的值,在您的 task
中创建 n DAG
。所以这是不可能的
但这是可能的(包括您想要实现的目标;即使您这样做的方式似乎不是一个好主意)
编辑-2
事实证明,从上游任务的输出生成任务毕竟是可能的;尽管它需要大量有关 Airflow 内部工作原理的知识以及一点创造力。
关于airflow - 在 Airflow 中创建子标签时访问父 dag 上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54745555/