python - Airflow:如何确保 DAG 每 5 分钟运行一次?

标签 python python-2.7 airflow airflow-scheduler

我正在探索 Apache Airflow。我正在使用一种在 MySQL 中插入记录的方法。

我已安排 DAG 每 5 分钟运行一次,但它似乎没有发生,因为 MYSQL 时间戳告诉 MySQL 任务在 5 分钟内执行了多次。

enter image description here

如您所见,它会在几分钟内插入记录。下面是我的代码:

import datetime as dt

from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def fetch_data_mysql():
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    sql = 'SELECT * from random_table'
    sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"
    print('INSERT MYSQL RESULT')
    # results = mysql_hook.get_records(sql)
    # results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))
    mysql_hook.run(sql, autocommit=True)

def print_world():
    print('world')
    return 'WORLD IN SEPTEMBER'


default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2018, 9, 11),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2),
}

with DAG('airflow_tutorial_v01',
         default_args=default_args,
         schedule_interval='0/5 * * * *',
         ) as dag:
    print_hello = BashOperator(task_id='print_hello',
                               bash_command='echo "hello"')
    sleep = BashOperator(task_id='sleep',
                         bash_command='sleep 5')
    print_world = PythonOperator(task_id='print_world',
                                 python_callable=print_world)
    mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)

print_hello >> sleep >> print_world >> mysql_task

我正在使用 v1.10.0

日志链接在这里:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0

最佳答案

你的狗正在回填。如果您检查日志,它的执行日期是 2018-09-20 00:15:00+00:002018-09-20 00:20:00+00:00, 2018-09-20 00:25:00+00:00, 依此类推。

将以下内容添加到您的default_args:

'catchup_by_default':假

您的 default_args 应该如下所示:

default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2018, 9, 11),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2),
    'catchup_by_default': False,
}

关于python - Airflow:如何确保 DAG 每 5 分钟运行一次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52425237/

相关文章:

Python 不能对 csv 文件使用 loadtxt

python - 如何将给定的序数(从 Excel)转换为日期

python - 为什么 Django 1.9 将设置和 URL 中的 tuples () 替换为列表 []?

google-cloud-dataflow - 使用 Dataflow 与 Cloud Composer

通过python运算符检查 Airflow 连接列表

python - Pandas 数据帧将值除以另一个数据帧中的匹配值

Python函数缩进错误: unexpected indent

python - 如何获取二维字典 python 中的所有键

javascript - 在高度受限的环境中无需安装 python 即可运行 python 代码

python - Apache Airflow 导入错误 : cannot import name '_psutil_linux'