python - 如何在一项任务中运行 dag

标签 python airflow scheduler

default_args = {
"owner": "airflow_admin",
"start_date": datetime(2017, 8, 29),
"schedule_interval":'*/5 * * * *'
 }
with DAG(
'my_first_dag',
catchup=False,
default_args=default_args,
) as dag:

   task1 = BashOperator(
   task_id='id1',
   bash_command=bash_.format('db1', 'tab1'))

task1

有谁知道如何通过一项任务启动 dag,上面的代码不起作用,dag 已正确启动

最佳答案

确保您的缩进正确。您的任务应在 DAG 上下文管理器的范围内定义,否则您必须向您的任务提供 dag 参数:

尝试:


default_args = {
    "owner": "airflow_admin",
    "start_date": datetime(2017, 8, 29),
    "schedule_interval": "0 5 * * *"
}
with DAG('my_first_dag', catchup=False, default_args=default_args) as dag:
    task1 = BashOperator(
        task_id='id1',
        bash_command=bash_.format('db1', 'tab1'))

或者不使用上下文管理器:

default_args = {
    "owner": "airflow_admin",
    "start_date": datetime(2017, 8, 29),
    "schedule_interval": "0 5 * * *"
}
dag = DAG('my_first_dag', catchup=False, default_args=default_args)
task1 = BashOperator(
        task_id='id1',
        bash_command=bash_.format('db1', 'tab1'),
        dag=dag)

关于python - 如何在一项任务中运行 dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62233244/

相关文章:

Python Flask after_request 和 before_request 用于一组特定的请求

python - 为什么在我排除之后不能调用异常?

python - 在 OpenCV Python 中创建轨迹栏以滚动大图像

使用 Airflow 测试与使用 DebugExecutor 调试 Airflow 任务

google-compute-engine - 如何将 Airflow 调度程序作为守护进程运行?

python - 当图像转换为 numpy 数组时如何调整图像大小

python - 动态构建集合以在 Airflow dag 中循环

airflow - 无法在 Airflow 中部署 DAG

TYPO3:通过调度程序发布工作区编辑?

load-balancing - 如何使用c#实现加权循环?