airflow-scheduler - 大量任务时, Airflow 调度器不调度(或缓慢)

标签 airflow-scheduler google-cloud-composer airflow

我在 Google Cloud Composer(版本:composer-1.10.2-airflow-1.10.6)上使用 Airflow 。

我意识到当有很多任务要处理时,我的调度程序不会安排任务(参见下面的甘特图)

Gantt View (不要注意颜色,红色任务是“createTable Operators”,如果表已经存在则失败,因此在 DAG 的下一部分(重要部分)运行之前它们必须失败 5 次)

任务之间有时间间隔! (例如上午 10 点到下午 15 点之间的 5 个小时没有任何反应)

通常它可以很好地处理大约 40 个 DAG,每个 DAG 大约有 100-200 个任务(有时更多)。但最近我添加了 2 个有很多任务的 DAG(每个大约 5000 个)并且调度程序非常慢或者不调度任务。 在屏幕截图上,我在下午 15 点暂停了 2 个有很多任务的 DAG,调度程序又回来了,工作正常。

你有什么解决办法吗?

Airflow 旨在成为处理“无限”数量任务的工具。

以下是有关我的环境的一些信息:

  • 版本:composer-1.10.2-airflow-1.10.6
  • 集群大小:6(12 个 vCPU,96GB 内存)

这里有一些关于 Airflow 配置的信息:

╔════════════════════════════════╦═══════╗
║ Airflow parameter              ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)-                     ║       ║
║ worker_concurrency             ║ 32    ║
║ -(webserver)-                  ║       ║
║ default_dag_run_display_number ║ 2     ║
║ workers                        ║ 2     ║
║ worker_refresh_interval        ║ 60    ║
║ -(core)-                       ║       ║
║ max_active_runs_per_dag        ║ 1     ║
║ dagbag_import_timeout          ║ 600   ║
║ parallelism                    ║ 200   ║
║ min_file_process_interval      ║ 60    ║
║ -(scheduler)-                  ║       ║
║ processor_poll_interval        ║ 5     ║
║ max_threads                    ║ 2     ║
╚════════════════════════════════╩═══════╝

谢谢你的帮助

编辑:

我的 26 个 DAG 是由单个 .py 文件创建的,它通过解析一个巨大的 JSON 变量来创建所有 DAG 和任务。

也许问题来自于此,因为今天 Airflow 正在调度其他 DAG 的任务,而不是我描述的 26 个(尤其是 2 个大 DAG)。 更准确地说,Airflow 有时会安排我的 26 个 DAG 的任务,但它更容易、更频繁地安排其他 DAG 的任务。

最佳答案

高任务间延迟通常表明存在与调度程序相关的瓶颈(而不是与工作人员相关的瓶颈)。即使一遍又一遍地运行相同的 DAG,Composer 环境仍然可能遇到这样的性能瓶颈,因为每次的工作分配可能不同,或者可能有不同的进程在后台运行。

首先,我建议增加调度程序可用的线程数 (scheduler.max_threads),然后确保您的调度程序不会占用其所在节点的所有 CPU。您可以通过确定调度程序所在节点的位置,然后在 Cloud Console 中检查来检查调度程序所在节点的 CPU 指标。要查找节点名称:

# Obtain the Composer namespace name
kubectl get namespaces | grep composer

# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler

如果上述方法没有帮助,那么也可能是调度器故意阻塞某个条件。要检查调度程序检查要运行的任务时评估的所有条件,请设置 core.logging_level=DEBUG。在调度程序日志(您可以在 Cloud Logging 中对其进行过滤)中,您可以检查所有通过或失败的条件,以便任务运行或保持排队。

关于airflow-scheduler - 大量任务时, Airflow 调度器不调度(或缓慢),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62197009/

相关文章:

python - Airflow:使用新计划从头开始重新运行 DAG

kubernetes - 为什么不应该从 Composer 运行 Kubernetes pod 超过一个小时?

google-cloud-platform - 如何在 Cloud Composer 中重新启动网络服务器

sqlalchemy - 从 Airflow 连接 postgres 数据库时出错

Airflow 用户访问管理

kubernetes - 如何让Google Cloud Composer( Airflow )在其他kubernetes群集上运行作业?

当 DAG A 当天完成后,Airflow DAG B 运行多次

python - Airflow - 外部 API 调用给出 Negsignal.SIGSEGV 错误

python - 如何运行 BigQuery 查询,然后将输出 CSV 发送到 Apache Airflow 中的 Google Cloud Storage?

airflow-scheduler - 如何调度不同参数的DAG