scheduler - 连续运行暂停 DAG 的 Airflow 调度程序

标签 scheduler airflow

Airflow 网络服务器

DAG 已暂停:
DAG is Paused

Airflow Scheduler 继续运行 DAG

我有一个全新安装的 Airflow。所有暂停的示例都运行良好。新的 DAG 示例即使在 Web 上暂停时也会继续运行。 PAUSING/unPAUSING 在默认示例上运行良好。
另一个问题是,它每 6 小时安排一次。为什么要继续尝试运行 DAG?

SWAT_Tutorial_01.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 3, 16),
        'email': ['sample@scholastic.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'catchup': False,
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
}

dag = DAG( 'SWAT.Tutorial_01', default_args=default_args, schedule_interval=timedelta(hours=6))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag)

t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
t3 = BashOperator( task_id='hello', bash_command='echo "Hello World"', retries=3, dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

调度程序日志(不断重复)
    File Path                                         PID  Runtime    Last Runtime    Last Run
----------------------------------------------  -----  ---------  --------------  ----------
/home/airflow/airflow/dags/SWAT_Tutorial_01.py  16930  0.00s
================================================================================
[2018-03-17 23:41:45,352] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-03-17 23:41:45,352] {jobs.py:1440} INFO - Heartbeating the executor
[2018-03-17 23:41:46,354] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-03-17 23:41:46,354] {dag_processing.py:559} INFO - Processor for /home/airflow/airflow/dags/SWAT_Tutorial_01.py finished
[2018-03-17 23:41:46,356] {dag_processing.py:627} INFO - Started a process (PID: 16932) to generate tasks for /home/airflow/airflow/dags/SWAT_Tutorial_01.py - logging into /var/log/airflow/scheduler/2018-03-17/SWAT_Tutorial_01.py.log
[2018-03-17 23:41:46,357] {jobs.py:1440} INFO - Heartbeating the executor
[2018-03-17 23:41:47,358] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-03-17 23:41:47,359] {jobs.py:1440} INFO - Heartbeating the executor
[2018-03-17 23:41:48,360] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-03-17 23:41:48,360] {dag_processing.py:559} INFO - Processor for /home/airflow/airflow/dags/SWAT_Tutorial_01.py finished
[2018-03-17 23:41:48,362] {dag_processing.py:627} INFO - Started a process (PID: 16934) to generate tasks for /home/airflow/airflow/dags/SWAT_Tutorial_01.py - logging into /var/log/airflow/scheduler/2018-03-17/SWAT_Tutorial_01.py.log
[2018-03-17 23:41:48,363] {jobs.py:1440} INFO - Heartbeating the executor

最佳答案

这里只是一些随机猜测。
我认为即使您暂停 dag, Airflow 也不会停止回填数据,这些工作可能是回填工作。
从您的代码看来,您没有禁用回填( catch_up = False )

关于scheduler - 连续运行暂停 DAG 的 Airflow 调度程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49343078/

相关文章:

c++ - 如何通过 Linux 的调度程序测量线程从繁忙的核心迁移的延迟?

azure - 如何在 Azure Scheduler 中使用查询?

java - Cloud Run 随机重启问题

java - Spring Cron 表达式 "*/5 * * * * ?"的含义

google-bigquery - 如何使用 Airflow 的 BigQuery 运算符引用外部 SQL 文件?

kubernetes - Airflow - KubernetesPodOperator - 绑定(bind)服务帐户的角色

java - Java 中的计划任务 - 使用哪个工具

high-availability - Airflow 设置以实现高可用性

google-cloud-platform - Google Cloud Composer 和 Google Cloud SQL

python - 如何根据前一个任务的结果在 SubDAG 中真正创建 n 个任务