airflow - 获取所有 Airflow 叶节点/任务

标签 airflow

我想构建一些我需要捕获所有叶任务并向它们添加下游依赖项以在我们的数据库中完成作业的东西。有没有一种简单的方法可以在 Airflow 中找到 DAG 的所有叶节点?

最佳答案

使用upstream_task_idsdownstream_task_ids @property from BaseOperator

def get_start_tasks(dag: DAG) -> List[BaseOperator]:
    # returns list of "head" / "root" tasks of DAG
    return [task for task in dag.tasks if not task.upstream_task_ids]


def get_end_tasks(dag: DAG) -> List[BaseOperator]:
    # returns list of "leaf" tasks of DAG
    return [task for task in dag.tasks if not task.downstream_task_ids]

Type-Annotations来自 Python 3.6+


UPDATE-1

现在 Airflow DAG model有强大@property functions喜欢

关于airflow - 获取所有 Airflow 叶节点/任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43529948/

相关文章:

python - 无法从 flower.command 导入名称 FlowerCommand

airflow - Airflow DAG运行已触发,但从未执行?

python - Airflow/Composer - 在 zip 打包的 DAG 中找不到模板

sqlalchemy - psycopg2.操作错误: could not translate host name "<address>" to address: Temporary failure in name resolution

airflow - 如何从 Apache Airflow 触发 azure Databricks notebook

google-cloud-platform - 我们可以将值传递到正在运行的 GCP Cloud Composer Pipeline 吗?

google-cloud-platform - Google Cloud Composer(Airflow) - DAG 内的数据流作业成功执行,但 DAG 失败

amazon-web-services - 如何使用 BashOprator 在 Airflow 中使用 Airflow AWS 连接凭证将文件从 AWS s3 存储桶传输到 GCS

python - 设置 Postgres celery result_backend 时 Airflow 调度程序崩溃

amazon-web-services - Amazon MWAA Airflow - 任务容器在没有日志的情况下关闭/停止/终止