如主题所述,我有一个 PySpark Dataframe,我需要将三列合并成行。每列本质上代表一个类别中的一个事实。最终目标是将数据汇总为每个类别的单一总数。
此数据帧中有数千万行,因此我需要一种方法来在 spark 集群上进行转换,而无需将任何数据返回给驱动程序(在本例中为 Jupyter)。
这是我的几家商店的数据框的摘录:
+------------+----------------+----------------+-- --------------+
| store_id |手上牛奶数量|手上面包数量|手上鸡蛋数量|
+------------+----------------+----------------+-- --------------+
| 100| 30| 105| 35|
| 200| 55| 85| 65|
| 300| 20| 125| 90|
+------------+----------------+----------------+-- --------------+
这是所需的结果数据框,每个商店多行,其中原始数据框的列已融合到新数据框的行中,新类别列中每个原始列一行:
+------------+--------+----------+
| product_id|类别|手头数量|
+------------+--------+----------+
| 100|牛奶| 30|
| 100|面包| 105|
| 100|鸡蛋| 35|
| 200|牛奶| 55|
| 200|面包| 85|
| 200|鸡蛋| 65|
| 300|牛奶| 20|
| 300|面包| 125|
| 300|鸡蛋| 90|
+------------+--------+----------+
最终,我想聚合生成的数据框以获得每个类别的总数:
+--------+----------------+
|类别|total_qty_on_hand|
+--------+----------------+
|牛奶| 105|
|面包| 315|
|鸡蛋| 190|
+--------+----------------+
更新: 有提示这个问题是重复的,可以回答here .事实并非如此,因为该解决方案将行转换为列,而我需要进行相反的操作,将列融合为行。
最佳答案
我们可以使用explode()功能来解决这个问题。在 Python 中,同样的事情可以用 melt
# Loading the requisite packages
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
| 100| 30| 105| 35|
| 200| 55| 85| 65|
| 300| 20| 125| 90|
+--------+----------------+-----------------+----------------+
编写下面的函数,它将分解
这个DataFrame:
def to_explode(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])
应用此 DataFrame 上的函数来分解
它-
df = to_explode(df, ['store_id'])\
.drop('store_id')
df.show()
+-----------------+-----------+
| CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk| 30|
|qty_on_hand_bread| 105|
| qty_on_hand_eggs| 35|
| qty_on_hand_milk| 55|
|qty_on_hand_bread| 85|
| qty_on_hand_eggs| 65|
| qty_on_hand_milk| 20|
|qty_on_hand_bread| 125|
| qty_on_hand_eggs| 90|
+-----------------+-----------+
现在,我们需要从 CATEGORY
列中删除字符串 qty_on_hand_
。可以使用 expr() 来完成功能。注意 expr
遵循基于 1 的子字符串索引,而不是 0 -
df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
| milk| 30|
| bread| 105|
| eggs| 35|
| milk| 55|
| bread| 85|
| eggs| 65|
| milk| 20|
| bread| 125|
| eggs| 90|
+--------+-----------+
最后,使用 agg() 聚合按 CATEGORY
分组的列 qty_on_hand
功能-
df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
| eggs| 190|
| bread| 315|
| milk| 105|
+--------+-----------------+
关于python - PySpark Dataframe 将列融为行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55378047/