python - PySpark 在附加超列名称时展平数据框

标签 python apache-spark pyspark apache-spark-sql

假设我有一个 PySpark 数据框 df:

>>> df.printSchema()
root
 |-- a: struct
      |-- alpha: integer
      |-- beta: string
      |-- gamma: boolean
 |-- b: string
 |-- c: struct
      |-- delta: string
      |-- epsilon: struct
           |-- omega: string
           |-- psi: boolean

我知道我可以展平数据框:

select_col_list = [col.replace("a", "a.*").replace("c", "c.*") for col in df.columns]
flat_df = df.select(*select_col_list)

这导致了这样的架构:

root
 |-- alpha: integer
 |-- beta: string
 |-- gamma: boolean
 |-- b: string
 |-- delta: string
 |-- epsilon: struct
      |-- omega: string
      |-- psi: boolean

但是我也想在展平时将超列的名称附加到子列,所以我希望生成的架构如下所示:

root
 |-- a_alpha: integer
 |-- a_beta: string
 |-- a_gamma: boolean
 |-- b: string
 |-- c_delta: string
 |-- c_epsilon: struct
      |-- omega: string
      |-- psi: boolean

我该怎么做?

最佳答案

我不认为有一种直接的方法可以做到这一点,但这是我想出的一个 hacky 解决方案。

  1. 定义要展开的列的列表并使用 pyspark.sql.functions.monotonically_increasing_id() 创建一个临时的 id 列.
  2. 遍历数据框中的所有列并为每一列创建一个临时数据框。
    • 如果列在 cols_to_expand 中:使用 .* 展开列。然后使用 alias() 使用相应的前缀重命名结果(临时)数据框中的所有字段(id 除外)| .
    • 如果该列不在 cols_to_expand 中:选择该列和 id 并将其存储在临时数据框中。
  3. temp_df 存储在列表中。
  4. 使用 id 加入列表中的所有数据帧并删除 id 列。

代码:

df = df.withColumn('id', f.monotonically_increasing_id())
cols_to_expand = ['a', 'c']
flat_dfs = []
for col in df.columns:
    if col in cols_to_expand:
        temp_df = df.select('id', col+".*")
        temp_df = temp_df.select(
            [
                f.col(c).alias(col+"_"+c if c != 'id' else c) for c in temp_df.columns
            ]
        )
    else:
        temp_df = df.select('id', col)

    flat_dfs.append(temp_df)

flat_df = reduce(lambda x, y: x.join(y, on='id'), flat_dfs)

flat_df = flat_df.drop('id')
flat_df.printSchema()

结果模式:

flat_df.printSchema()
#root
# |-- a_alpha: integer (nullable = true)
# |-- a_beta: string (nullable = true)
# |-- a_gamma: boolean (nullable = true)
# |-- b: string (nullable = true)
# |-- c_delta: string (nullable = true)
# |-- c_epsilon: struct (nullable = true)
# |    |-- omega: string (nullable = true)
# |    |-- psi: boolean (nullable = true)

关于python - PySpark 在附加超列名称时展平数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49263438/

相关文章:

java - 如何使用apache-spark快速转换多节点上的大规模数据?

python - 类型错误 : 'JavaPackage' object is not callable

python-3.x - Pyspark UDF 属性错误 : 'NoneType' object has no attribute '_jvm'

python - 维度为 5 000 000 : Save to Database or File? 的对称稀疏矩阵的性能

mongodb - 使用多个主机配置 mongodb Spark 连接器

java - 如何将 csv 字符串转换为 Spark-ML 兼容的 Dataset<Row> 格式?

python - Pyspark - 具有重置条件的累积和

python - python pandas 中的数据配对

python - 在 python 中识别 windows/linux GUI 中的文本以进行自动化测试

Python复制较大的文件太慢