python - Airflow - params 内的 Jinja 模板(postgresoperator)

标签 python sql airflow

我在 Airflow DAG 中使用了许多 Postgres 运算符(使用 for 循环构建它),我想知道是否可以将 JINJA 模板传递给我的运算符,例如:

params = {'max_field': '{{ ti.xcom_pull(task_ids="get_max_field_' + table + '", key="max_field") }}'} 

这样在 .sql 文件中我只需要一个看起来像这样的查询

SELECT .... FROM .... WHERE ... > '{{ params.max_field }}'

我的问题是我现在有太多查询,无法直接执行

SELECT .... FROM .... WHERE ... > '{{ ti.xcom_pull(task_ids="get_max_field_table1'", key="max_field") }}' 

存在很多错误风险(例如,当人们进行复制粘贴时,写入 table2 而不是 table1)。

我尝试将参数放入 template_fields = ('sql', 'parameters', 'params') 但它不起作用,它用 '{{ ti.xcom_pull(task_ids="get_max_field_table1'", key 渲染我的查询="max_field") }}' 而不是值。

我还尝试使用参数而不是 params,然后执行以下操作:

SELECT .... FROM .... WHERE ... > '%(max_field)s'

但是如果查询中有 %,那么我的查询就会出现问题:

SELECT .... FROM .... WHERE ... > '%(max_field)s' and text like '%hello%'

最佳答案

params 不支持 Jinja 模板。

关于python - Airflow - params 内的 Jinja 模板(postgresoperator),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53729515/

相关文章:

python - 如何在 Python3 中将字符串转换为 unicode?

python - OverflowError : (34, 'Numerical result out of range' ) 在编写 pow(x,n) 的自定义实现时

python - Airflow ExternalTask​​Sensor 卡住了

python - 将 numpy 数组转换为 pandas 数据框

python - 当我使用对象名称时,我得到 'DeferredAttribute' 对象没有属性 'get'

sql - 从 SQL 查询中获取表名

mysql - 返回的node-mysql JSON类型用反斜杠引用

SQL Server 在存储过程期间锁定表

python - 如何有效地使 Airflow dag 定义成为数据库驱动的

cron - 手动触发 Airflow DAG 会干扰预定的 Airflow 触发吗?