Airflow DAG - 失败的任务没有显示应有的失败状态

标签 airflow webserver

我刚刚开始使用 Airflow DAG,并遇到了该工具的一个奇怪问题。我正在使用 Airflow 版本 2.3.3 和 SequentialExecutor。

我使用的脚本:

import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

dag_args = {
    'owner': 'hao',
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=1)
}

with DAG(
        dag_id='dependency_experiment',
        default_args=dag_args,
        description='experiment the dag task denpendency expression',
        start_date=datetime.datetime.now(),
        schedule_interval='@daily',
        dagrun_timeout=datetime.timedelta(seconds=10),
) as dag:

    pyOp = PythonOperator(
        task_id='pyOp',
        python_callable=lambda x: haha * x,
        op_kwargs={'x': 10}
    )

    pyOp

此任务的日志片段:

NameError: name 'haha' is not defined

[2022-07-27, 18:19:34 EDT] {taskinstance.py:1415} INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220728T021932, start_date=20220728T021934, end_date=20220728T021934

[2022-07-27, 18:19:34 EDT] {standard_task_runner.py:92} ERROR - Failed to execute job 44 for task pyOp (name 'haha' is not defined; 19405)

[2022-07-27, 18:19:34 EDT] {local_task_job.py:156} INFO - Task exited with return code 1

[2022-07-27, 18:19:34 EDT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

问题: 我故意定义了一个PythonOperator,但它会失败。当我将脚本放在 DAG 上时,任务按预期引发了异常;但是,此任务的状态始终是已跳过。我无法弄清楚为什么该任务没有按预期显示失败状态。任何建议将不胜感激。

最佳答案

这是因为您在 dag_args 字典中定义了 'retries''retry_delay'

From the Docs:

default_args (Optional[Dict]) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.

当您将'retries'设置为某个值时,Airflow 会认为该任务将在其他时间重试。因此它在 UI 中显示为已跳过。

如果您从 dag_args 中删除 'retries''retry_delay',当您尝试时,您会看到该任务设置为失败启动 DAG。


当我在日志中运行您的代码时,我看到:

INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T060953, start_date=20220729T060953, end_date=20220729T060953

删除'retries''retry_delay'后,相同的日志变为:

INFO - Marking task as FAILED. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T061031, start_date=20220729T061031, end_date=20220729T061031

关于Airflow DAG - 失败的任务没有显示应有的失败状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73146565/

相关文章:

kubernetes - Airflow 调度器无法连接到 Kubernetes 服务 api

c++ - 使用 Boost [C++] 将 http 文件内容读取为字符串

iOS web api数据管理(单例?)

java - 在 Java 中/public_html/下的 Web 服务器中创建一个文件夹

Linux CentOS 6 负载均衡器和Web服务器选择

python - 如何测试使用 XCom 的 Apache Airflow 任务

hadoop - Airflow :如何重新运行依赖的DAG

airflow - 将运算符融合在一起

python - 如何使用 Python 在 Airflow 中的另一个 DAG 成功时触发 DAG?

Tomcat 8 上的 PHP