假设我有一个 easteregg.py
文件:
from airflow import DAG
from dateutil import parser
from datetime import timedelta, datetime, time
from airflow.operators import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from easteregg import easteregg_workflows as wf
defaults = {
'owner': "JohnDoe",
'depends_on_past': False,
'email': ['john.doe@mail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15),
'start_date': parser.parse('2019-01-01 00:00:00')
}
dag = DAG('easteregg', default_args=defaults,
schedule_interval="*/5 * * * *")
step1 = PythonOperator(
task_id="step1",
python_callable=wf.run1,
default_args=defaults,
provide_context=True,
pool="pool",
dag=dag)
step0 = DummyOperator(
task_id="step0",
dag=dag)
step0 >> step1
easteregg_workflows.py
文件很简单:
def run1(**kwargs):
logging.info("Hello airflow 1")
我现在想简单地在 Airflow 上触发这项工作。所以我发出了命令:
Airflow 运行 eastereg
但它向我抛出一个错误airflow run:错误:需要以下参数:task_id,execution_date
。
为了使这个 DAG 能够运行,我需要缺少什么?
最佳答案
您的 dag 文件没有丢失任何内容。
airflow run eastereg
是一个不正确的命令。 airflow run
不运行 dag。相反,它运行一个 task_id
尝试使用airflow run -h
。
usage: airflow run [-h] [-sd SUBDIR] [-m] [-f] [--pool POOL]
[--cfg_path CFG_PATH] [-l] [-A] [-i] [-I] [--ship_dag]
[-p PICKLE] [-int]
dag_id task_id execution_date
Airflow 需要 dag_id、task_id 和execution_date
positional arguments:
dag_id The id of the dag
task_id The id of the task
execution_date The execution date of the DAG
要运行 dag,您可以使用 trigger_dag
命令。 https://airflow.apache.org/cli.html
Airflow trigger_dag eastereg
Usage:airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF] [-e EXEC_DATE]
dag_id
关于airflow - 运行 dag 并让 Airflow 运行 : error: the following arguments are required: task_id,execution_date,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55254342/