celery - Airflow - 如何让工作人员完成所有 dag run 任务?

标签 celery airflow

我目前正在使用 Airflow 和 Celery 处理文件。工作人员需要下载文件、处理它们并在之后重新上传它们。我的 DAG 只需要一名 worker 就可以了。但是当我添加一个时,事情就变得复杂了。

工作人员在有空时接受任务。 Worker1 可以承担“处理下载的文件”任务,但 Worker2 承担了“下载文件”任务,因此任务失败,因为它无法处理不存在的文件。

有没有办法向工作人员(或调度程序)指定 DAG 必须仅在一个工作人员上运行?我知道队列。但我已经在使用它们了。

最佳答案

在这种情况下,您可以使用 Airflow 变量来保存所有工作节点的名称。 例如:

  • 变量:worker_list
  • 值:boxA, boxB, boxC

运行 Airflow worker 时,您可以指定多个作业队列。例如:airflow worker job_queue1,job_queue2 对于你的情况,我将运行 airflow worker af_<hostname>

在你的 DAG 代码中,只需要获取那个 worker_list Airflow 变量,随机选择一个框,然后将你所有的作业排队到 af_<random_selected_box>排队

关于celery - Airflow - 如何让工作人员完成所有 dag run 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54905657/

相关文章:

python - 如何使用 Python 在 Airflow 中的另一个 DAG 成功时触发 DAG?

airflow - Cloud Composer 定价

python - Airflow 使用执行日期 : 'datetime' is undefined

python - 当 DAG 运行时,我们如何处理 python 脚本中的异常?

python - 如何阻止 Celery 在此任务中创建重复的用户?

python - 具有多个装饰器的 celery 任务不会自动注册任务名称

django - 无法连接到redis ://localhost:6379/0: Error 111 connecting to localhost:6379. 连接被拒绝

python - 将 celery worker 作为守护进程运行时不要创建 pidfile 和 logfile

python - Celery 段错误

python - Airflow 网络服务器获取值错误 :Samesite