我刚刚开始使用 Airflow DAG,并遇到了该工具的一个奇怪问题。我正在使用 Airflow 版本 2.3.3 和 SequentialExecutor。
我使用的脚本:
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
dag_args = {
'owner': 'hao',
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1)
}
with DAG(
dag_id='dependency_experiment',
default_args=dag_args,
description='experiment the dag task denpendency expression',
start_date=datetime.datetime.now(),
schedule_interval='@daily',
dagrun_timeout=datetime.timedelta(seconds=10),
) as dag:
pyOp = PythonOperator(
task_id='pyOp',
python_callable=lambda x: haha * x,
op_kwargs={'x': 10}
)
pyOp
此任务的日志片段:
NameError: name 'haha' is not defined
[2022-07-27, 18:19:34 EDT] {taskinstance.py:1415} INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220728T021932, start_date=20220728T021934, end_date=20220728T021934
[2022-07-27, 18:19:34 EDT] {standard_task_runner.py:92} ERROR - Failed to execute job 44 for task pyOp (name 'haha' is not defined; 19405)
[2022-07-27, 18:19:34 EDT] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-27, 18:19:34 EDT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
问题:
我故意定义了一个PythonOperator,但它会失败。当我将脚本放在 DAG 上时,任务按预期引发了异常;但是,此任务的状态始终是已跳过
。我无法弄清楚为什么该任务没有按预期显示失败
状态。任何建议将不胜感激。
最佳答案
这是因为您在 dag_args 字典中定义了 'retries'
和 'retry_delay'
。
default_args (Optional[Dict]) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
当您将'retries'
设置为某个值时,Airflow 会认为该任务将在其他时间重试。因此它在 UI 中显示为已跳过。
如果您从 dag_args
中删除 'retries'
和 'retry_delay'
,当您尝试时,您会看到该任务设置为失败启动 DAG。
当我在日志中运行您的代码时,我看到:
INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T060953, start_date=20220729T060953, end_date=20220729T060953
删除'retries'
和'retry_delay'
后,相同的日志变为:
INFO - Marking task as FAILED. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T061031, start_date=20220729T061031, end_date=20220729T061031
关于Airflow DAG - 失败的任务没有显示应有的失败状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73146565/