python - PySpark 将字典的字符串化数组分解成行

标签 python apache-spark dataframe pyspark apache-spark-sql

我有一个带有 StringType 列 (edges) 的 pyspark 数据框,其中包含字典列表(请参见下面的示例)。字典包含多种值类型,包括另一个字典 (nodeIDs)。我需要将 edges 字段中的顶级字典分解成行;理想情况下,我应该能够将它们的组件值转换为单独的字段。

输入:

import findspark
findspark.init()

SPARK = SparkSession.builder.enableHiveSupport() \
                    .getOrCreate()

data = [
    Row(trace_uuid='aaaa', timestamp='2019-05-20T10:36:33+02:00', edges='[{"distance":4.382441320292239,"duration":1.5,"speed":2.9,"nodeIDs":{"nodeA":954752475,"nodeB":1665827480}},{"distance":14.48582171131768,"duration":2.6,"speed":5.6,"nodeIDs":{"nodeA":1665827480,"nodeB":3559056131}}]', count=156, level=36),
    Row(trace_uuid='bbbb', timestamp='2019-05-20T11:36:10+03:00', edges='[{"distance":0,"duration":0,"speed":0,"nodeIDs":{"nodeA":520686131,"nodeB":520686216}},{"distance":8.654358326561642,"duration":3.1,"speed":2.8,"nodeIDs":{"nodeA":520686216,"nodeB":506361795}}]', count=179, level=258)
    ]

df = SPARK.createDataFrame(data)

期望的输出:

    data_reshaped = [
        Row(trace_uuid='aaaa', timestamp='2019-05-20T10=36=33+02=00', distance=4.382441320292239, duration=1.5, speed=2.9, nodeA=954752475, nodeB=1665827480, count=156, level=36),
        Row(trace_uuid='aaaa', timestamp='2019-05-20T10=36=33+02=00', distance=16.134844841712574, duration=2.9,speed=5.6, nodeA=1665827480, nodeB=3559056131, count=156, level=36),
        Row(trace_uuid='bbbb', timestamp='2019-05-20T11=36=10+03=00', distance=0, duration=0, speed=0, nodeA=520686131, nodeB=520686216, count=179, level=258),
        Row(trace_uuid='bbbb', timestamp='2019-05-20T11=36=10+03=00', distance=8.654358326561642, duration=3.1, speed=2.8, nodeA=520686216, nodeB=506361795, count=179, level=258)
       ]

有办法吗?我试过先使用 castedges 字段转换为数组,但我不知道如何让它与混合数据类型一起工作。

我正在使用 Spark 2.4.0。

最佳答案

您可以使用 from_json()schema_of_json()推断 JSON 模式。例如:

from pyspark.sql import functions as F

# a sample json string:  
edges_json_sample = data[0].edges
# or edges_json_sample = df.select('edges').first()[0]

>>> edges_json_sample
#'[{"distance":4.382441320292239,"duration":1.5,"speed":2.9,"nodeIDs":{"nodeA":954752475,"nodeB":1665827480}},{"distance":14.48582171131768,"duration":2.6,"speed":5.6,"nodeIDs":{"nodeA":1665827480,"nodeB":3559056131}}]'

# infer schema from the sample string
schema = df.select(F.schema_of_json(edges_json_sample)).first()[0]

>>> schema
#u'array<struct<distance:double,duration:double,nodeIDs:struct<nodeA:bigint,nodeB:bigint>,speed:double>>'

# convert json string to data structure and then retrieve desired items
new_df = df.withColumn('data', F.explode(F.from_json('edges', schema))) \
           .select('*', 'data.*', 'data.nodeIDs.*') \
           .drop('data', 'nodeIDs', 'edges')
           
>>> new_df.show()
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+
|count|level|           timestamp|trace_uuid|         distance|duration|speed|     nodeA|     nodeB|
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+
|  156|   36|2019-05-20T10:36:...|      aaaa|4.382441320292239|     1.5|  2.9| 954752475|1665827480|
|  156|   36|2019-05-20T10:36:...|      aaaa|14.48582171131768|     2.6|  5.6|1665827480|3559056131|
|  179|  258|2019-05-20T11:36:...|      bbbb|              0.0|     0.0|  0.0| 520686131| 520686216|
|  179|  258|2019-05-20T11:36:...|      bbbb|8.654358326561642|     3.1|  2.8| 520686216| 506361795|
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+

# expected result
data_reshaped = new_df.rdd.collect()

关于python - PySpark 将字典的字符串化数组分解成行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56590112/

相关文章:

python - 如何在 flask 中使用 request.args.get()?

python - 使用 Airflow 在运行时导出环境变量

python - 使用现有的 id 创建一个新对象

python - (python 矩阵)可视化 mllib 评估

apache-spark - 使用 Spark 读取 SAS sas7bdat 数据

python - 如何在 Pandas 中使用另一个数据框(与字符串列相关)添加新列

python - 如何将打印精美的数据框读入 Pandas 数据框?

python - seaborn 热图 pandas 在 isnull 上的计算

python - PyQt Python 应用程序中的后台计时器

python - 将 .csv 数据重写到文件会在 python 中创建人工第一列