python - 如何在 pyspark 中按字母顺序对嵌套结构的列进行排序?

标签 python apache-spark struct pyspark

我有以下架构的数据。我希望所有列都应按字母顺序排序。我想要它在 pyspark 数据框中。

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

下面的代码只对外部列进行排序,而不对嵌套列进行排序。

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

这段代码之后的模式看起来像

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(因为id处有下划线,所以先出现)

我想要的模式如下。 (地址里面的列也要排序)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

提前致谢。

最佳答案

这是一个适用于任意深度嵌套的 StructType 的解决方案,它不依赖于对任何列名进行硬编码。

为了演示,我创建了以下稍微复杂的模式,其中在 address 列中有第二层嵌套。假设您的 DataFrame schema 如下:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)

注意 address.zip 字段,其中包含 2 个乱序子字段。

您可以定义一个函数,该函数将递归地遍历您的架构并对字段进行排序以构建 Spark-SQL 选择表达式:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols

在这个 DataFrame 的模式上运行它会产生(为了便于阅读,我将长“地址”字符串分成两行):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']

现在使用 selectExpr 对列进行排序:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)

关于python - 如何在 pyspark 中按字母顺序对嵌套结构的列进行排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57821538/

相关文章:

python - 获取字典中最大的n个键值

python - Pandas,如果包含 Nan,整数变量将变为 float

java - Spark 支持 gzip 格式吗?

c++ - 寻找一种重新初始化结构的简单方法

c++ - 为什么我的 For 循环没有完成收集数据以保存在动态分配的数组中?

c++ - union 、类和结构在哪里使用?

python - 如何使用 tkinter 中的按钮设置 `Entry` 小部件的文本/值/内容

python - 在 Windows 上使用 conda 或与 conda 一起安装 Python alpha 和 beta 版本时,我有哪些选择?

python - PySpark 从 TimeStampType 列向 DataFrame 添加一列

python - 使用 python 进行 Spark 流处理时出现错误?