我在 Airflow 中有一个运算符(operator):
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
现在,我需要运行的实际查询长度为24行。我想将其保存在文件中,并为运算符(operator)提供SQL文件的路径。运营商支持这一点,但是我不确定该如何处理需要SQL的参数。
有什么建议吗?
编辑:
这是我的代码:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
sql = '{{ templates_dict.sql }}',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
这给出了:
jinja2.exceptions.UndefinedError: 'templates_dict' is undefined
最佳答案
如您所见,MySqlToGoogleCloudStorageOperator指定扩展名为.sql的template_ext
。
首先在Dag
中,指定放置.sql文件的路径
dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])
在yourfile.sql中放入您的大型查询。注意
params.ord_id
SELECT * FROM orders where orderid> {{ params.ord_id }}
现在,在运算符的
sql
参数中,传递文件名。import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql='yourfile.sql',
params={"ord_id":99},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
重要的是,不要在该文件名之后放置空格。这是因为Jinja模板引擎将查找以
.sql
结尾的字符串,如果这样做,它将把它视为文件而不是字符串。
关于airflow - 如何将带有参数的SQL作为文件传递给Airflow Operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52688757/