python-3.x - 调度 Airflow TaskGroup抛出AttributeError

标签 python-3.x airflow python-3.8 airflow-2.x

因此,我正在任务组中创建任务,并尝试将它们添加到我的 dag 任务序列中,但它抛出此错误:

Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
  File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False)
  File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'

我正在创建我的任务组和任务,如下所示:

def get_task_group(dag, task_group):
    t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
    t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
    t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
    t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
    t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
    t_list = [t2, t3, t4]
    t1.set_downstream(t_list)
    t5.set_upstream(t_list)


with DAG('some_dag', default_args=args) as dag:
    with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
      run_model_task_group = get_task_group(dag, tg)

    a1 = DummyOperator(task_id='a1', dag=dag)
    a2 = DummyOperator(task_id='a2', dag=dag)
    a3 = DummyOperator(task_id='a3', dag=dag)
    a4 = DummyOperator(task_id='a4', dag=dag)

    a1.set_downstream(a2)
    a2.set_downstream(run_model_task_group)
    a3.set_upstream(run_model_task_group)
    a3.set_downstream(a4)

如果我删除任务组并通过删除行来使任务组任务不再排序

a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)

我可以看到 a1、a2 a3 和 a4 已正确排序,并且我可以断开连接的 run_model_task_group 任务,但是一旦将其添加到序列中,我就会收到上述错误。

谁能指导我这里可能发生什么? enter image description here

请注意,我使用采用 dagtask_group 参数的函数来创建任务组任务,因为我想为另一个 dag 创建相同的任务集也是。

Python Version: 3.8.8
Airflow Version: 2.0.1

最佳答案

AttributeError:'NoneType'对象没有属性'update_relative'发生这种情况是因为run_model_task_groupNone超出了的范围>With block ,这是 Python 的预期行为。

无需对迄今为止所做的事情进行太多更改,您可以重构 get_task_group() 以返回 TaskGroup 对象,如下所示:

def get_task_group(dag, group_id):

    with TaskGroup(group_id=group_id, dag=dag) as tg:
        t1 = DummyOperator(task_id='t1', dag=dag)
        t2 = DummyOperator(task_id='t2', dag=dag)
        t3 = DummyOperator(task_id='t3', dag=dag)
        t4 = DummyOperator(task_id='t4', dag=dag)
        t5 = DummyOperator(task_id='t5', dag=dag)
        t_list = [t2, t3, t4]
        t1.set_downstream(t_list)
        t5.set_upstream(t_list)
    return tg

在 DAG 定义中只需使用以下方式调用它:

run_model_task_group = get_task_group(dag, "run_model_tasks")

生成的图形 View 如下所示:

graph_view

DAG 定义:

with DAG('some_dag',
         default_args=default_args,
         start_date=days_ago(2),
         schedule_interval='@once') as dag:
    # with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
    #     run_model_task_group = get_task_group(dag, )
    run_model_task_group = get_task_group(dag, "run_model_tasks")
    a1 = DummyOperator(task_id='a1', dag=dag)
    a2 = DummyOperator(task_id='a2', dag=dag)
    a3 = DummyOperator(task_id='a3', dag=dag)
    a4 = DummyOperator(task_id='a4', dag=dag)

    a1.set_downstream(a2)
    a2.set_downstream(run_model_task_group)
    a3.set_upstream(run_model_task_group)
    a3.set_downstream(a4)

最后,考虑使用按位运算符代替 set_downstreamset_upstream,这是推荐的方式,而且也比较简洁 source here .

请告诉我这是否对您有用。

测试使用:Airflow 版本:2.1.4、Python 3.8.10

关于python-3.x - 调度 Airflow TaskGroup抛出AttributeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69766838/

相关文章:

python - 我无法使用 matplotlib 在 colab 上绘图

Airflow - 如何重新加载操作符和插件

python - 在Python中按多个子字符串对字符串进行排序

python - 如何匹配时间数据 ' 01/27/1998' 到 '%d/%m/%y' 之间的格式?

python - 无法使用 set 剔除重复结果

python - 随机日期和月份,但保留年份和时间间隔

Python subprocess.check_call( ["wine"]..) 存在同步问题

使用 Airflow 测试与使用 DebugExecutor 调试 Airflow 任务

Airflow 插件未正确拾取

python - 如何修复 'pygame.error: Couldn' t 打开 .png' 错误