python - 在 Airflow CLI 中测试 DAG 时出现 DagNotFound 错误

标签 python airflow directed-acyclic-graphs

我收到错误:airflow.exceptions.DagNotFound: Dag id test_task not found in DagModel当试图通过 airflow trigger_dag test_dag 运行 dag 时.

DAG 在运行时正确列出 airflow list_dags .我还检查以确保 $AIRFLOW_HOME 目录正确设置为 dag 所在的位置。让它工作的唯一方法是运行特定任务,例如 airflow test test_dag test_task .正在运行 python dags/test_dag.py没有显示错误。

导入后 dag 文件本身的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['my@email.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
  dag_id='test_dag'
  default_args=default_args, 
  schedule_interval=timedelta(days=1)
)

最佳答案

我之前在添加新 DAG(在新文件中)时遇到过这个问题。不确定这是否有帮助,但基本上我认为发生这种情况是因为 list_dags 导致 Airflow 查找 DAG 并列出它们,但是当您“触发”DAG 时,它会告诉调度程序在 DAG 中查找 test_dag 知道 - 它可能(还)不知道这个,因为它是新的。

如果您有权访问 Web UI,您可能会注意到当您收到此错误时 DAG 不存在——点击 DAG 刷新按钮会导致新的 DAG 出现。我敢肯定有一个很棒的解决方案可以监视目录或其他内容并在它更改时引起此刷新,但是在 airflow.cfg 中您可以设置刷新率。

关于python - 在 Airflow CLI 中测试 DAG 时出现 DagNotFound 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57897418/

相关文章:

python - 如何使用 keras-self-attention 包可视化注意力 LSTM?

Sun Grid Engine 上的 Python MapReduce

重命名任务后 Airflow dag 卡住

python - Python 中的组合学

Python:用字符串中的整数替换 "wrong" float

python - 如何逐列迭代 pandas 数据框,一次返回一个项目

java - 如何在 Airflow 中运行 Spark 代码?

algorithm - 在有向无环图中寻找最长路径

Airflow 任务因 "Scheduler heartbeat got an exception"错误而被终止

etl - Airflow 保留相同的数据库连接吗?