Airflow 无法识别 DAG 调度

标签 airflow airflow-scheduler

我正在尝试制定每周、每月的 Airflow 计划,但不起作用。有人可以报告可能发生的情况吗?如果我每周、每月进行安排,它就会保持静止,就好像它被关闭一样。没有错误信息,只是不执行。我发送了一个代码示例来演示我如何安排...还有其他方法可以进行此安排吗?

import airflow
import os
import six
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.slack_operator import SlackAPIPostOperator

default_args = {
    'owner': 'bexs-data',
    'start_date': airflow.utils.dates.days_ago(0),
    'depends_on_past': False,
    'email': ['<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0b6a62796d67647c4b6a7b6a68636e2564796c" rel="noreferrer noopener nofollow">[email protected]</a>'],
    'email_on_failure': False,
    'email_on_retry': False,
    'depends_on_past': False,
    # If a task fails, retry it once after waiting
    # at least 5 minutes
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': slack_msg
}

dag = DAG(
    dag_id=nm_dag,
    default_args=default_args,
    schedule_interval='51 18 * * 4', 
    dagrun_timeout=timedelta(minutes=60)
)

最佳答案

有关于不执行以下操作的文档: 'start_date': airflow.utils.dates.days_ago(0), 因为这样一来,自您的开始日期起就不会再有 1 周的间隔,这意味着第一个间隔不会关闭,并且第一次运行未安排。

建议:选择上周的第 4 日(星期四?)中的固定日期作为您的 start_date

Airflow will accept a datetime or a string 。对于非 UTC 计划,请使用 airflow.utils.timezonedatetime。例如,:

default_args = {
  'owner': 'your-unix-user-id-or-ldap-etc',
  'start_date': '2018-1-1',
...
}

from airflow.utils.timezone import datetime

default_args = {
  'owner': 'your-unix-user-id-or-ldap-etc',
  'start_date': datetime(2018, 1, 1),
...
}

关于Airflow 无法识别 DAG 调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58207766/

相关文章:

python - TensorFlow Extended (TFX) : Clarify Beam, Airflow 和 Kubeflow 使用

google-cloud-platform - Google Cloud Composer 变量不会传播到 Airflow

python - 如何在 Airflow 中的 SLA 上设置时间对象而不是 timedelta?

Airflow 调度 : how to run initial setup task only once?

airflow - 是否可以在 Airflow 任务执行期间检索最后一次成功执行任务的日期?

deployment - 使用 Airflow 进行零停机部署

airflow - 是否有 Airflow 日志文件返回代码的文档?

google-cloud-storage - 如何等待作业完成或文件在 Airflow 中更新

kubernetes - Airflow kubernetes pod 运算符(operator)和任务之间共享文件?

airflow - 仅运行最新的 Airflow DAG