python - 未达到 Airflow 传感器超时

标签 python airflow etl

我有 Airflow HttpSensor。它的默认超时时间为 7 天。但 dag 在 5 分钟内就失败了。我尝试传递 timeout 参数,但看起来它对实际超时没有影响。

Dag 运行日志:

[2021-04-01 07:55:10,416] {{taskinstance.py:670}} INFO - Dependencies all met for <TaskInstance: piracy_overall_counters.wait_trial_report 2021-03-01T00:00:00+00:00 [queued]>
[2021-04-01 07:55:10,466] {{taskinstance.py:670}} INFO - Dependencies all met for <TaskInstance: piracy_overall_counters.wait_trial_report 2021-03-01T00:00:00+00:00 [queued]>
[2021-04-01 07:55:10,466] {{taskinstance.py:880}} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 07:55:10,466] {{taskinstance.py:881}} INFO - Starting attempt 5 of 6
[2021-04-01 07:55:10,466] {{taskinstance.py:882}} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 07:55:10,494] {{taskinstance.py:901}} INFO - Executing <Task(StorageReportSensor): wait_trial_report> on 2021-03-01T00:00:00+00:00
[2021-04-01 07:55:10,529] {{standard_task_runner.py:54}} INFO - Started process 69 to run task
[2021-04-01 07:55:10,621] {{standard_task_runner.py:77}} INFO - Running: ['airflow', 'run', 'piracy_overall_counters', 'wait_trial_report', '2021-03-01T00:00:00+00:00', '--job_id', '9496', '--pool', 'light', '--raw', '-sd', 'DAGS_FOLDER/airflow_dags/piracy_overall_counters.py', '--cfg_path', '/tmp/tmptm6wrwe_']
[2021-04-01 07:55:10,622] {{standard_task_runner.py:78}} INFO - Job 9496: Subtask wait_trial_report
[2021-04-01 07:55:10,726] {{logging_mixin.py:112}} INFO - Running <TaskInstance: piracy_overall_counters.wait_trial_report 2021-03-01T00:00:00+00:00 [running]> on host 228ef2809a07
[2021-04-01 07:55:10,784] {{http_sensor.py:77}} INFO - Poking: reports/2021-04-01/Custom/***


**** retry for 5 min

[2021-04-01 07:59:12,511] {{http_sensor.py:77}} INFO - Poking: reports/2021-04-01/Custom/****
[2021-04-01 07:59:12,529] {{base_hook.py:89}} INFO - Using connection to: id: storage. ****
[2021-04-01 07:59:12,530] {{http_hook.py:136}} INFO - Sending 'GET' to url: ****
  InsecureRequestWarning,
[2021-04-01 07:59:12,890] {{http_hook.py:150}} ERROR - HTTP error: Not Found
[2021-04-01 07:59:12,890] {{http_hook.py:151}} ERROR - <html>
<head><title>404 Not Found</title></head>
<body>
<center><h1>404 Not Found</h1></center>
<hr><center>nginx/1.18.0</center>
</body>
</html>

[2021-04-01 08:00:10,785] {{timeout.py:42}} ERROR - Process timed out, PID: 69
[2021-04-01 08:00:10,804] {{taskinstance.py:1150}} ERROR - Timeout, PID: 69
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 979, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 122, in execute
    sleep(self.poke_interval)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 69
[2021-04-01 08:00:10,806] {{taskinstance.py:1194}} INFO - Marking task as UP_FOR_RETRY. dag_id=piracy_overall_counters, task_id=wait_trial_report, execution_date=20210301T000000, start_date=20210401T075510, end_date=20210401T080010

[2021-04-01 08:00:11,629] {{email.py:132}} INFO - Sent an alert email to [***]
[2021-04-01 08:00:16,697] {{local_task_job.py:102}} INFO - Task exited with return code 1

最佳答案

这并不明显,但 Sensor 只是 Operator 的一种类型。因此 DAG 默认参数也会影响传感器。我总是超过 execution_timeout default_arg 并且它比 7 天小得多,所以它导致超时。

为了防止出现此类问题,您可以增加默认参数中的execution_timeout,或者将其显式传递给传感器以覆盖默认值。 例如:

HttpSensor('index.html', timeout=timeout,execution_timeout=timedelta(秒=timeout))

关于python - 未达到 Airflow 传感器超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66903237/

相关文章:

airflow - 异常如何传递给on_failure_callback?

sql-server - 在 ssis 脚本任务中格式化 excel 目标列

amazon-web-services - 如何使用 AWS Glue 从 S3 导入 JSON 数据?

python - 从列表和元素本身中删除重复元素

python - 如何从列表项制作嵌套字典?

python - 文件和文件夹树的数据结构?

etl - 从 Snowflake 中的表流中手动刷新数据

python - 使用变量 "interpolation"创建导入语句

使用 Airflow 测试与使用 DebugExecutor 调试 Airflow 任务

airflow - 对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数?