python - PySpark to_json 丢失了数组中结构的列名

标签 python dataframe apache-spark pyspark apache-spark-sql

我正在尝试从嵌套的 pyspark DataFrame 生成一个 json 字符串,但丢失了键值。 我的初始数据集类似于以下内容:

data = [
    {"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
    {"foo": [1], "bar": [4], "buzz": [7]},
    {"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
## +---------+---------+---------+
## |      bar|     buzz|      foo|
## +---------+---------+---------+
## |   [4, 5]|   [7, 8]|   [1, 2]|
## |      [4]|      [7]|      [1]|
## |[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
## +---------+---------+---------+

然后我使用 arrays_zip 将每一列压缩在一起:

df_zipped = (
    df
    .withColumn(
        "zipped",
        F.arrays_zip(
            F.col("foo"),
            F.col("bar"),
            F.col("buzz"),
        )
    )
)
df_zipped.printSchema()
root
 |-- bar: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- buzz: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- foo: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- zipped: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- foo: long (nullable = true)
 |    |    |-- bar: long (nullable = true)
 |    |    |-- buzz: long (nullable = true)

问题是在压缩数组上使用 to_json。它丢失了 foo、bar 和 buzz 键值,而是将键保存为元素索引

(
    df_zipped
    .withColumn("zipped", F.to_json("zipped"))
    .select("zipped")
    .show(truncate=False)
)
+-------------------------------------------------------------+
|zipped                                                       |
+-------------------------------------------------------------+
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8}]                    |
|[{"0":1,"1":4,"2":7}]                                        |
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8},{"0":3,"1":6,"2":9}]|
+-------------------------------------------------------------+

如何保留“bar”、“buzz”和“foo”而不是 0、1、2?

最佳答案

手动指定架构也可以: 对于 foo、bar 和 buzz 字段,必须命名元素顶部的数组,而不是实际的数据字段本身

data = [
    {"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
    {"foo": [1], "bar": [4], "buzz": [7]},
    {"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
+---------+---------+---------+
|      bar|     buzz|      foo|
+---------+---------+---------+
|   [4, 5]|   [7, 8]|   [1, 2]|
|      [4]|      [7]|      [1]|
|[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
+---------+---------+---------+

然后手动定义并转换为模式:

schema = StructType([
    StructField("foo", IntegerType()),
    StructField("bar", IntegerType()),
    StructField("buzz", IntegerType()),
])

df_zipped = (
    df_test
    .select(
        F.arrays_zip(
            F.col("foo"), 
            F.col("bar"), 
            F.col("buzz"),
                )
        .alias("zipped")
            )
    .filter(F.col("zipped").isNotNull())
    .select(F.col("zipped").cast(ArrayType(schema)))
)

这会产生所需的解决方案:

(
    df_zipped
    .withColumn("zipped", F.to_json("zipped"))
    .select("zipped")
    .show(truncate=False)
)
+----------------------------------------------------------------------------------+
|zipped                                                                            |
+----------------------------------------------------------------------------------+
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8}]                           |
|[{"foo":1,"bar":4,"buzz":7}]                                                      |
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8},{"foo":3,"bar":6,"buzz":9}]|
+----------------------------------------------------------------------------------+

注意:在模式中转换为 LongType 不起作用

关于python - PySpark to_json 丢失了数组中结构的列名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63816830/

相关文章:

Python:将元组转换为二维数组

python - 将一个 Pandas DataFrame 的元素放置到另一个 DataFrame 中元素的位置

python - 获取给定条件成立的数据帧的列号

python-3.x - 为什么两个 DataFrame 在使用 `=` 时会被链接?

join - 如何在 Apache Spark SQL 中执行更新

apache-spark - Spark Thrift 服务器与 Apache Thrift 的关系

python - 无法通过 pip 或 npm 安装软件包

python - 在 Python 2 或 3 中,如何同时获取系统调用的返回码和返回字符串?

python - 如何在 python 2.7 中添加对 SNI 的支持

apache-spark - Spark UDAF : java. lang.InternalError:类名格式错误