airflow - 运行 dag 并让 Airflow 运行 : error: the following arguments are required: task_id,execution_date

标签 airflow

假设我有一个 easteregg.py 文件:

from airflow import DAG

from dateutil import parser
from datetime import timedelta, datetime, time
from airflow.operators import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from easteregg import easteregg_workflows as wf

defaults = {
    'owner': "JohnDoe",
    'depends_on_past': False,
    'email': ['john.doe@mail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
    'start_date': parser.parse('2019-01-01 00:00:00')
}

dag = DAG('easteregg', default_args=defaults, 
    schedule_interval="*/5 * * * *")


step1 = PythonOperator(
    task_id="step1",
    python_callable=wf.run1,
    default_args=defaults,
    provide_context=True,
    pool="pool",
    dag=dag)

step0 = DummyOperator(
    task_id="step0",
    dag=dag)

step0 >> step1

easteregg_workflows.py 文件很简单:

def run1(**kwargs):
    logging.info("Hello airflow 1")

我现在想简单地在 Airflow 上触发这项工作。所以我发出了命令:

Airflow 运行 eastereg

但它向我抛出一个错误airflow run:错误:需要以下参数:task_id,execution_date

为了使这个 DAG 能够运行,我需要缺少什么?

最佳答案

您的 dag 文件没有丢失任何内容。 airflow run eastereg 是一个不正确的命令。 airflow run 不运行 dag。相反,它运行一个 task_id

尝试使用airflow run -h

usage: airflow run [-h] [-sd SUBDIR] [-m] [-f] [--pool POOL]
                   [--cfg_path CFG_PATH] [-l] [-A] [-i] [-I] [--ship_dag]
                   [-p PICKLE] [-int]
                   dag_id task_id execution_date

Airflow 需要 dag_id、task_id 和execution_date

positional arguments:
dag_id The id of the dag
task_id The id of the task
execution_date The execution date of the DAG

要运行 dag,您可以使用 trigger_dag 命令。 https://airflow.apache.org/cli.html

Airflow trigger_dag eastereg

Usage:airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF] [-e EXEC_DATE]
                    dag_id

关于airflow - 运行 dag 并让 Airflow 运行 : error: the following arguments are required: task_id,execution_date,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55254342/

相关文章:

python - Airflow - 分支连接运算符

docker - 在BashOperator dag中使用 Airflow 变量

amazon-web-services - 在 Apache Airflow DAG 中使用 AWS SES 发送失败的电子邮件

airflow - Composer 2/GKE Autopilot 集群 PodOperator 任务的工作负载身份和服务帐户

flask - Airflow Webserver 访问日志的位置

python - PythonOperator 内的 Airflow PythonOperator

python - 以编程方式在 dockerized apache Airflow python 操作符内创建 SSH 隧道

scheduler - 连续运行暂停 DAG 的 Airflow 调度程序

kubernetes - Airflow Kubernetes Executor 是否运行任何 Operator?

airflow - 使用 XCOM 值在 Airflow 中创建动态工作流