我有一个Variable
存储方式如下:
x | {"a": 1, "b": 2}
我想在我的 DAG 脚本中检索这个完整的 JSON 作为字典。作品如下:
from airflow.models import Variable
op = CustomOperator(
templated_field = Variable.get('x', deserialize_json = True)
)
# called in CustomOperator:
templated_field.keys()
# dict_keys(['a', 'b'])
documentation建议以下内容应该是等效的:
The second call [using
Variable.get(deserialize_json=True)
] assumes json content and will be deserialized......or if you need to deserialize a json object from the variable [using a Jinja template]:
{{ var.json.<variable_name> }}
但是,当我使用它然后尝试提取 key 时,我收到错误:
op = CustomOperator(
templated_field = '{{ var.json.x }}'
)
# called in CustomOperator
templated_field.keys()
AttributeError: 'str' object has no attribute 'keys'
该错误表明实际上 json 未反序列化:
# same error
"{'a': 1, 'b': 2}".keys()
使用 var.json
的唯一示例我在网上发现的方法不是从 JSON 中提取 dict,而是使用 JSON 路径来提取标量:
# https://www.applydatascience.com/airflow/airflow-variables/
{{ var.json.example_variables_config.var3 }}
# https://gist.github.com/kaxil/61f41dd87a69230d1a637dc3a1d2fa2c
{{ var.json.dag1_config.var1 }}
Hossein 很有帮助地指出该字段应该模板化;这是来自 CustomOperator
的行:
class CustomOperator(SuperCustomOperator):
template_fields = SuperCustomOperator.template_fields + ('template_field',)
我错过了什么吗?是金贾var.json
该方法仅适合提取标量,还是也可以用于提取 JSON-as-dict?
最佳答案
我找到了this answer这帮助我解决了将字典作为变量传递的类似问题。
问题是 Jinja2 的 {{ var.json.x }}
echo 是 var.json.x
的值,因此它转换了 dict
到 str
,它的副作用是将类似 json 的 str 中的双引号更改为单引号,因此您无法直接将字符串加载为 json-string。
我的解决方案是添加一个自定义过滤器,它将字典呈现为 json 字符串(需要时使用双引号),然后我可以将其转换回函数内的字典。
代码:
dag = DAG(
...
user_defined_filters={'tojson': lambda s: json.dumps(s)},
)
op = PythonOperator(
...
op_kwargs = { "input_dict" : {{ var.json.x | tojson }} }
# also applicable to other templated fields
python_callable = func
)
def func(input_dict):
input_dict = json.loads(input_dict)
...
它按预期工作,现在我可以使用 input_dict
作为字典
关于python - Variable.get ('x' , deserialize_json=True) 与 Jinja '{{ var.json.x }}' 相同吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59420808/