python - 如何运行一个简单的 Airflow DAG

标签 python airflow

我是 Airflow 的新手。我想在指定日期运行一个简单的 DAG。我正在努力区分开始日期、执行日期和回填日期。运行 DAG 的命令是什么?

这是我从那以后尝试过的:

airflow run dag_1 task_1 2017-1-23

我第一次运行该命令时,任务执行正确,但当我再次尝试时它没有工作。

这是我运行的另一个命令:

airflow backfill dag_1 -s 2017-1-23 -e 2017-1-24

我不知道该命令会带来什么。 DAG 会在每天 23 点到 24 点执行吗?

在运行上面的两个命令之前,我这样做了:

airflow initdb
airflow scheduler 
airflow webserver -p 8085 --debug &

这是我的DAG

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 1, 23, 12),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag_1', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='create_clients',
    bash_command='Rscript /scripts/Cli.r',
    dag=dag)

t2 = BashOperator(
    task_id='create_operation',
    bash_command='Rscript Operation.r',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

截图:Tree View

更新

airflow run dag_1 task_1 2017-1-23T10:34

最佳答案

如果你用

运行一次
airflow run dag_1 task_1 2017-1-23

运行已保存,再次运行将不会执行任何操作,您可以尝试通过强制重新运行它

airflow run --force=true dag_1 task_1 2017-1-23

airflow backfill 命令将运行本应在从开始日期到结束日期指定的时间段内运行的任何执行。这将取决于您在 DAG 上设置的时间表,如果您将其设置为每小时触发一次,它应该运行 24 次,但它也不会重新执行之前执行的运行。

您可以像从未运行过一样清除任务

airflow clear dag_1 -s 2017-1-23 -e 2017-1-24

另请在此处查看 cli 文档:https://airflow.incubator.apache.org/cli.html

关于python - 如何运行一个简单的 Airflow DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41805265/

相关文章:

python - 如何将 Airflow 与 Pywikibot 结合使用

Airflow 2 - 调试为什么 dag 没有加载

python - react + socket.io 发出未通过 - 基本错误

python - 如何在 python 的 gekko 优化器中使用数组

command-line - 激活 Airflow DAG的命令行选项

python - 将 pandas parquet 写入分区到 s3

python - 解析在 Django 中使用 Ajax GET 方法发送的 json 对象

python - 在 Python 中使用函数列出文件夹名称

python - Youtube API - 设置视频缩略图(python)

Airflow 错误 - ValueError : Unable to configure handler 'file.processor'