airflow - 如何将带有参数的SQL作为文件传递给Airflow Operator

标签 airflow

我在 Airflow 中有一个运算符(operator):

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

现在,我需要运行的实际查询长度为24行。我想将其保存在文件中,并为运算符(operator)提供SQL文件的路径。运营商支持这一点,但是我不确定该如何处理需要SQL的参数。

有什么建议吗?

编辑:
这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

这给出了:

jinja2.exceptions.UndefinedError: 'templates_dict' is undefined

最佳答案

如您所见,MySqlToGoogleCloudStorageOperator指定扩展名为.sql的template_ext

首先在Dag中,指定放置.sql文件的路径

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

在yourfile.sql中放入您的大型查询。注意params.ord_id
SELECT * FROM orders where orderid> {{ params.ord_id }}

现在,在运算符的sql参数中,传递文件名。
import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

重要的是,不要在该文件名之后放置空格。这是因为Jinja模板引擎将查找以.sql结尾的字符串,如果这样做,它将把它视为文件而不是字符串。

关于airflow - 如何将带有参数的SQL作为文件传递给Airflow Operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52688757/

相关文章:

python - Airflow 中是否有运算符可以根据 BigQuery 中的查询创建表?

airflow - 在 BigQueryOperator 中将参数添加为 template_fields

kubernetes - 将自定义 volumeMount 添加到 Airflow worker pod 中(使用 k8s Executor)

Python 操作符中的 Airflow 宏

python - Airflow BashOperator 不起作用,但 PythonOperator 可以

airflow - 如何在使用 Airflow 实现的工作流中等待 DAG 任务中的异步事件?

Apache Airflow 中的 Kubernetes 执行器 : Pod getting deleted immediately with error

python - 清除后 Airflow 强制重新运行上游任务,即使下游任务标记为成功

celery - Airflow 1.10.1 在哪里存储其三个服务(即网络服务器、调度程序和工作线程)的日志

python - 尝试使用 DataProcSparkOperator 任务加载 DAG 的 AttributeError