因此,我正在任务组中创建任务,并尝试将它们添加到我的 dag 任务序列中,但它抛出此错误:
Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'
我正在创建我的任务组和任务,如下所示:
def get_task_group(dag, task_group):
t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
with DAG('some_dag', default_args=args) as dag:
with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
run_model_task_group = get_task_group(dag, tg)
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
如果我删除任务组并通过删除行来使任务组任务不再排序
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
我可以看到 a1、a2 a3 和 a4 已正确排序,并且我可以断开连接的 run_model_task_group
任务,但是一旦将其添加到序列中,我就会收到上述错误。
谁能指导我这里可能发生什么?
请注意,我使用采用 dag
和 task_group
参数的函数来创建任务组任务,因为我想为另一个 dag 创建相同的任务集也是。
Python Version: 3.8.8
Airflow Version: 2.0.1
最佳答案
AttributeError:'NoneType'对象没有属性'update_relative'
发生这种情况是因为run_model_task_group
其None
超出了的范围>With
block ,这是 Python 的预期行为。
无需对迄今为止所做的事情进行太多更改,您可以重构 get_task_group()
以返回 TaskGroup
对象,如下所示:
def get_task_group(dag, group_id):
with TaskGroup(group_id=group_id, dag=dag) as tg:
t1 = DummyOperator(task_id='t1', dag=dag)
t2 = DummyOperator(task_id='t2', dag=dag)
t3 = DummyOperator(task_id='t3', dag=dag)
t4 = DummyOperator(task_id='t4', dag=dag)
t5 = DummyOperator(task_id='t5', dag=dag)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
return tg
在 DAG 定义中只需使用以下方式调用它:
run_model_task_group = get_task_group(dag, "run_model_tasks")
生成的图形 View 如下所示:
DAG 定义:
with DAG('some_dag',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='@once') as dag:
# with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
# run_model_task_group = get_task_group(dag, )
run_model_task_group = get_task_group(dag, "run_model_tasks")
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
最后,考虑使用按位运算符代替 set_downstream
和 set_upstream
,这是推荐的方式,而且也比较简洁 source here .
请告诉我这是否对您有用。
测试使用:Airflow 版本:2.1.4、Python 3.8.10
关于python-3.x - 调度 Airflow TaskGroup抛出AttributeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69766838/