loops - 如何在 Airflow BashOperator 和 SSHOperator 中的 jinja 模板中传递循环和动态任务 ID

标签 loops templates jinja2 airflow

我正在尝试在循环中创建多个任务,并在 BashOperatorSSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取.

for group_key in range(1,5):
    dag = create_dag(group_key)
    globals()[dag.dag_id] = dag

    for i in range(2):  
        delete_xcom_task = PostgresOperator(
          task_id='delete-xcom-task_'+str(i),
          postgres_conn_id='pg_airflow_db',
          sql= "delete from xcom where dag_id= '" + dag.dag_id + "' ",
         dag=dag)

        t0_get_next = PythonOperator(
            task_id='load_hist_pkg_to_ts_'+ str(i),
            provide_context=True,
            python_callable=load_hist_pkg_to_ts,
            xcom_push=True,
            op_kwargs={'pg_conn_id':pg_conn_id,
                       'pg_conn_schema': pg_conn_schema,
                      },
            dag=dag)

        t1_check_file=ShortCircuitOperator(
            task_id='check_if_file_exist_'+str(i),
            python_callable=_check_files,
            provide_context=True,
            op_kwargs={'load_hist_pkg_to_ts_cnt':'load_hist_pkg_to_ts_'+str(i),
                      },
            dag=dag,
        )

        t0_move_file = BashOperator(
            task_id='bash_move_file_'+str(i),
            bash_command= "{{ ti.xcom_pull(key = 't0_bash_move_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
            dag=dag)


        t1_copyfile = SSHOperator(
            ssh_conn_id='ssh_execute_rempte',
            task_id='ssh_file_to_postgres_'+str(i),
            xcom_push=True,
            command= "{{ ti.xcom_pull(key = 't1_bash_copy' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
            dag=dag)

        t2_archive_file = BashOperator(
            task_id='bash_archive_file_'+str(i),
            bash_command= "{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
            dag=dag)

        delete_xcom_task>>t0_get_next >>t1_check_file>>t0_move_file>>t1_copyfile>>t2_archive_file

最佳答案

已解决 - 必须像下面这样做

bash_command= "{{{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_{}' , dag_id = ti.dag_id)}}}}".format(i),

关于loops - 如何在 Airflow BashOperator 和 SSHOperator 中的 jinja 模板中传递循环和动态任务 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57151376/

相关文章:

javascript - 将 JavaScript 对象数据传递到 Bootstrap 模式

python - Pandas:迭代并插入具有组内条件的列复杂问题

c++ - 哥德巴赫猜想程序编写中的问题

javascript - AngularJS:在指令模板函数中返回一个 promise

c++ - "end"不能在模板函数中使用

python - 是否可以将 AngularJS 与 Jinja2 模板引擎一起使用?

java - 将图像转换为二进制字符串

c++ - 绑定(bind)可变模板参数时拷贝过多

javascript - 在 Flask 模板中运行 Ember-Table

python - 如何从 sqlalchemy 获取元数据以在 jinja 中显示为表格?