python-3.x - Airflow Scheduler active 探针崩溃(2.0 版)

标签 python-3.x kubernetes airflow airflow-scheduler airflow-2.x

我刚刚将 Airflow 从 1.10.13 升级到 2.0。我正在使用 Kubernetes Executor 在 Kubernetes (AKS Azure) 中运行它。不幸的是,由于 Liveness 探测失败,我看到我的调度程序每 15-20 分钟就会被杀死一次。因此我的 pod 不断重启。
我在 1.10.13 中没有问题。
这是我的 Liveness 探针:

import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'

from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys

with create_session() as session:
  job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
      SchedulerJob.latest_heartbeat.desc()).limit(1).first()

sys.exit(0 if job.is_alive() else 1)
当我查看调度程序日志时,我看到以下内容:
[2021-02-16 12:18:21,883] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor489-Process' pid=12812 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:22,228] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,232] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,232] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,232] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,232] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,233] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,233] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,236] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,246] {scheduler_job.py:1390} DEBUG - Next timed event is in 0.143059
[2021-02-16 12:18:22,246] {scheduler_job.py:1392} DEBUG - Ran scheduling loop in 0.05 seconds
[2021-02-16 12:18:22,422] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,426] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,426] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,426] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,427] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,427] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,427] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,439] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-02-16 12:18:22,452] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12819)
[2021-02-16 12:18:22,460] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor490-Process' pid=12819 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,009] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12826)
[2021-02-16 12:18:23,017] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor491-Process' pid=12826 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,594] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12833)

... Many of these Disposing DB connection pool entries here

[2021-02-16 12:20:08,212] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor675-Process' pid=14146 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:08,916] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14153)
[2021-02-16 12:20:08,924] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor676-Process' pid=14153 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:09,475] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14160)
[2021-02-16 12:20:09,484] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor677-Process' pid=14160 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,044] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14167)
[2021-02-16 12:20:10,053] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor678-Process' pid=14167 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,610] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14180)
[2021-02-16 12:23:42,287] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
[2021-02-16 12:23:43,290] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,494] {process_utils.py:201} INFO - Waiting up to 5 seconds for processes to exit...
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=14180, status='terminated', started='12:20:09') (14180) terminated with exit code None
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=9286, status='terminated', exitcode=0, started='12:13:35') (9286) terminated with exit code 0
[2021-02-16 12:23:43,506] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,506] {scheduler_job.py:1296} INFO - Exited execute loop
[2021-02-16 12:23:43,523] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-02-16 12:23:43,525] {settings.py:290} DEBUG - Disposing DB connection pool (PID 7)

最佳答案

我设法通过设置以下配置来修复我的重启:

[kubernetes]
...
delete_option_kwargs = {"grace_period_seconds": 10}
enable_tcp_keepalive = True
tcp_keep_idle = 30
tcp_keep_intvl = 30
tcp_keep_cnt = 30
我在 AWS 中运行了另一个 Airflow 实例 - Kubernetes。那个在任何版本上都可以正常运行,我意识到问题在于 Azure Kubernetes,其余的 api 调用了 api 服务器。
以防万一这对其他人有帮助....

关于python-3.x - Airflow Scheduler active 探针崩溃(2.0 版),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66225341/

相关文章:

python - Python v3.7.x中,被调用函数如何获取调用函数的名称?

python - 添加字典值的元素

python-3.x - while 循环时索引错误 : list index out of range (Python beginner,)

kubernetes - 将 Snakemake 与 Azure Kubernetes 服务结合使用

kubernetes - 在 storageclass.storage.k8s.io "fast"中未找到创建持久卷结果

python - 在python中将32位二进制转换为十进制

fedora - Kubernetes 依赖 kubelet 和 master 进程

python - 无法从 flower.command 导入名称 FlowerCommand

python - 将前一个 Python 操作符任务的返回值传递到 Airflow 中的另一个操作符任务

python - 访问运算符(operator)外部的 Airflow 运算符(operator)值