假设我有一个 PySpark 数据框 df
:
>>> df.printSchema()
root
|-- a: struct
|-- alpha: integer
|-- beta: string
|-- gamma: boolean
|-- b: string
|-- c: struct
|-- delta: string
|-- epsilon: struct
|-- omega: string
|-- psi: boolean
我知道我可以展平数据框:
select_col_list = [col.replace("a", "a.*").replace("c", "c.*") for col in df.columns]
flat_df = df.select(*select_col_list)
这导致了这样的架构:
root
|-- alpha: integer
|-- beta: string
|-- gamma: boolean
|-- b: string
|-- delta: string
|-- epsilon: struct
|-- omega: string
|-- psi: boolean
但是我也想在展平时将超列的名称附加到子列,所以我希望生成的架构如下所示:
root
|-- a_alpha: integer
|-- a_beta: string
|-- a_gamma: boolean
|-- b: string
|-- c_delta: string
|-- c_epsilon: struct
|-- omega: string
|-- psi: boolean
我该怎么做?
最佳答案
我不认为有一种直接的方法可以做到这一点,但这是我想出的一个 hacky 解决方案。
- 定义要展开的列的列表并使用
pyspark.sql.functions.monotonically_increasing_id()
创建一个临时的id
列. - 遍历数据框中的所有列并为每一列创建一个临时数据框。
- 如果列在
cols_to_expand
中:使用.*
展开列。然后使用alias()
使用相应的前缀重命名结果(临时)数据框中的所有字段(id
除外)| . - 如果该列不在
cols_to_expand
中:选择该列和id
并将其存储在临时数据框中。
- 如果列在
- 将
temp_df
存储在列表中。 - 使用
id
加入列表中的所有数据帧并删除id
列。
代码:
df = df.withColumn('id', f.monotonically_increasing_id())
cols_to_expand = ['a', 'c']
flat_dfs = []
for col in df.columns:
if col in cols_to_expand:
temp_df = df.select('id', col+".*")
temp_df = temp_df.select(
[
f.col(c).alias(col+"_"+c if c != 'id' else c) for c in temp_df.columns
]
)
else:
temp_df = df.select('id', col)
flat_dfs.append(temp_df)
flat_df = reduce(lambda x, y: x.join(y, on='id'), flat_dfs)
flat_df = flat_df.drop('id')
flat_df.printSchema()
结果模式:
flat_df.printSchema()
#root
# |-- a_alpha: integer (nullable = true)
# |-- a_beta: string (nullable = true)
# |-- a_gamma: boolean (nullable = true)
# |-- b: string (nullable = true)
# |-- c_delta: string (nullable = true)
# |-- c_epsilon: struct (nullable = true)
# | |-- omega: string (nullable = true)
# | |-- psi: boolean (nullable = true)
关于python - PySpark 在附加超列名称时展平数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49263438/