json - 从 Hive 表中的 json 字符串中提取值

标签 json pyspark hive apache-spark-sql

我有如下数据

+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|asset_id  |chg_log                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4455628986|[{'oldValues': [], 'newValues': ['COMPO_MAIL'], 'fieldName': 'Communication Type', 'fieldType': 'TEXT', 'lookupInfo': {'fieldType': 'TEXT'}, 'additional': {'GROUP_QUALIFIER': [{'qualifiers': [{'lookupKey': 'Communication_Type', 'value': 'COMPO_MAIL'}]}]}}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DATE'}, 'fieldName': 'Delivery Due Date', 'fieldType': 'DATE', 'newValues': ['1601771520000']}, {'oldValues': [], 'lookupInfo': {'lookupType': 'CUST_ID', 'fieldType': 'CUST_ID'}, 'fieldName': 'Customer Id', 'fieldType': 'CUST_ID', 'newValues': ['10486']}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DROPDOWN'}, 'fieldName': 'Process_Status', 'fieldType': 'PICKLIST', 'newValues': ['Request Review']}]  |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

chg_log 列的 Json 字符串中,我想提取 fieldName - Process_Status 的值及其 newValues 作为请求审核。预期结果如下。

+----------+---------------+
|asset_id  |Process_Status |
+----------+---------------+
|4455628986|Request Review |
+----------+---------------+

json每次的顺序并不相同。有时 Process_Status 首先出现在 json 字符串中,然后是通信类型等等...... 我尝试使用函数 json_extract 但无法获取它。
我如何在 Spark Sql 或 Pyspark 或 Hive 中实现此目的。有人可以帮我吗?

提前致谢

最佳答案

您的列 chg_log 似乎是一个字符串化 Python 字典,而不是有效的 JSON 字符串。

在 Pyspark 中,您可以使用 UDF 将字典转换为 json,然后使用 from_json 将其转换为结构体数组,最后过滤数组以查找字段 Process_Status :

import ast
from pyspark.sql import functions as F

dict_to_json = F.udf(lambda x: json.dumps(ast.literal_eval(x)))

df = df.withColumn("chg_log", dict_to_json(F.col("chg_log")))

df1 = df.withColumn(
    "chg_log",
    F.from_json("chg_log", F.schema_of_json(df.select("chg_log").head()[0]))
).withColumn(
    "chg_log",
    F.expr("filter(chg_log, x -> x.fieldName = 'Process_Status')")[0]
).select(
    F.col("asset_id"), F.col("chg_log.newValues").alias("Process_Status")
)

df1.show()
# +----------+----------------+
# |  asset_id|  Process_Status|
# +----------+----------------+
# |4455628986|[Request Review]|
# +----------+----------------+

另一种方法是直接在 UDF 中进行查找:

parse_status = F.udf(
    lambda x: next(i["newValues"] for i in ast.literal_eval(x) if i["fieldName"] == "Process_Status"),
    ArrayType(StringType())
)

df1 = df.select(F.col("asset_id"), parse_status(F.col("chg_log")).alias("Process_Status"))

关于json - 从 Hive 表中的 json 字符串中提取值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66756582/

相关文章:

apache-spark - Hive 元存储中的上次访问时间更新

python - 在python3.6中将两个jsonl(json行)文件合并并写入到一个新的jsonl文件中

pyspark 滞后函数(基于列)

javascript - 请求建议 - 使用 json/jquery 和 ajax 在函数之间传递变量

hadoop - 如何将存储在包含行的HDFS中的文本文件转换为Pyspark中的数据框?

pandas - Spark 与 Scala 和 Pandas

arrays - 将配置单元数组列转换为映射列

csv - HIVE 因不工作而逃脱 '\\'

java - 将 xml 解析为 java 对象时没有单字符串构造函数/工厂方法错误

javascript - JSON写入文件不会更新,只是替换