python - Variable.get ('x' , deserialize_json=True) 与 Jinja '{{ var.json.x }}' 相同吗?

标签 python json airflow

我有一个Variable存储方式如下:

x | {"a": 1, "b": 2}

我想在我的 DAG 脚本中检索这个完整的 JSON 作为字典。作品如下:

from airflow.models import Variable
op = CustomOperator(
  templated_field = Variable.get('x', deserialize_json = True)
)
# called in CustomOperator:
templated_field.keys()
# dict_keys(['a', 'b'])

documentation建议以下内容应该是等效的:

The second call [using Variable.get(deserialize_json=True)] assumes json content and will be deserialized...

...or if you need to deserialize a json object from the variable [using a Jinja template]:

{{ var.json.<variable_name> }}

但是,当我使用它然后尝试提取 key 时,我收到错误:

op = CustomOperator(
  templated_field = '{{ var.json.x }}'
)
# called in CustomOperator
templated_field.keys()

AttributeError: 'str' object has no attribute 'keys'

该错误表明实际上 json 未反序列化:

# same error
"{'a': 1, 'b': 2}".keys()

使用 var.json 的唯一示例我在网上发现的方法不是从 JSON 中提取 dict,而是使用 JSON 路径来提取标量:

# https://www.applydatascience.com/airflow/airflow-variables/
{{ var.json.example_variables_config.var3 }}
# https://gist.github.com/kaxil/61f41dd87a69230d1a637dc3a1d2fa2c
{{ var.json.dag1_config.var1 }}

Hossein 很有帮助地指出该字段应该模板化;这是来自 CustomOperator 的行:

class CustomOperator(SuperCustomOperator):
    template_fields = SuperCustomOperator.template_fields + ('template_field',)

我错过了什么吗?是金贾var.json该方法仅适合提取标量,还是也可以用于提取 JSON-as-dict?

最佳答案

我找到了this answer这帮助我解决了将字典作为变量传递的类似问题。 问题是 Jinja2 的 {{ var.json.x }} echo 是 var.json.x 的值,因此它转换了 dictstr ,它的副作用是将类似 json 的 str 中的双引号更改为单引号,因此您无法直接将字符串加载为 json-string。

我的解决方案是添加一个自定义过滤器,它将字典呈现为 json 字符串(需要时使用双引号),然后我可以将其转换回函数内的字典。

代码:

dag = DAG(
    ...
    user_defined_filters={'tojson': lambda s: json.dumps(s)},
)

op = PythonOperator(
    ...
    op_kwargs = { "input_dict" : {{ var.json.x | tojson }} }
    # also applicable to other templated fields
    python_callable = func
)

def func(input_dict):
    input_dict = json.loads(input_dict)
    ...

它按预期工作,现在我可以使用 input_dict 作为字典

关于python - Variable.get ('x' , deserialize_json=True) 与 Jinja '{{ var.json.x }}' 相同吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59420808/

相关文章:

python - 为什么 Airflow Scheduler 只能作为非守护进程工作,而作为守护进程却失败?

python - Google 平台上的 Composer 不适用于 Python 3

python - airflow trigger_dag execution_date 是第二天,为什么?

python - 从 python 的日志记录器中删除处理程序

javascript - 加载 db.json 时出错

c# - 发出 JSON 的 WCF Web 服务(菜鸟建议)

json - React-Native Fetch "POST"请求在 Android 中抛出 "SyntaxError: Unexpected end of JSON input"

python - Django 随机排序列表

python - 无法在终端 : "execution_count": null 中执行 jupyter notebook

python - 我如何sudo当前进程?