python - PySpark Dataframe 将列融为行

标签 python dataframe pyspark aggregate melt

如主题所述,我有一个 PySpark Dataframe,我需要将三列合并成行。每列本质上代表一个类别中的一个事实。最终目标是将数据汇总为每个类别的单一总数。

此数据帧中有数千万行,因此我需要一种方法来在 spark 集群上进行转换,而无需将任何数据返回给驱动程序(在本例中为 Jupyter)。

这是我的几家商店的数据框的摘录: +------------+----------------+----------------+-- --------------+ | store_id |手上牛奶数量|手上面包数量|手上鸡蛋数量| +------------+----------------+----------------+-- --------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +------------+----------------+----------------+-- --------------+

这是所需的结果数据框,每个商店多行,其中原始数据框的列已融合到新数据框的行中,新类别列中每个原始列一行: +------------+--------+----------+ | product_id|类别|手头数量| +------------+--------+----------+ | 100|牛奶| 30| | 100|面包| 105| | 100|鸡蛋| 35| | 200|牛奶| 55| | 200|面包| 85| | 200|鸡蛋| 65| | 300|牛奶| 20| | 300|面包| 125| | 300|鸡蛋| 90| +------------+--------+----------+

最终,我想聚合生成的数据框以获得每个类别的总数: +--------+----------------+ |类别|total_qty_on_hand| +--------+----------------+ |牛奶| 105| |面包| 315| |鸡蛋| 190| +--------+----------------+

更新: 有提示这个问题是重复的,可以回答here .事实并非如此,因为该解决方案将行转换为列,而我需要进行相反的操作,将列融合为行。

最佳答案

我们可以使用explode()功能来解决这个问题。在 Python 中,同样的事情可以用 melt

来完成
# Loading the requisite packages 
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit        
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

编写下面的函数,它将分解这个DataFrame:

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])

应用此 DataFrame 上的函数来分解它-

df = to_explode(df, ['store_id'])\
     .drop('store_id')
df.show()
+-----------------+-----------+
|         CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk|         30|
|qty_on_hand_bread|        105|
| qty_on_hand_eggs|         35|
| qty_on_hand_milk|         55|
|qty_on_hand_bread|         85|
| qty_on_hand_eggs|         65|
| qty_on_hand_milk|         20|
|qty_on_hand_bread|        125|
| qty_on_hand_eggs|         90|
+-----------------+-----------+

现在,我们需要从 CATEGORY 列中删除字符串 qty_on_hand_。可以使用 expr() 来完成功能。注意 expr 遵循基于 1 的子字符串索引,而不是 0 -

df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
|    milk|         30|
|   bread|        105|
|    eggs|         35|
|    milk|         55|
|   bread|         85|
|    eggs|         65|
|    milk|         20|
|   bread|        125|
|    eggs|         90|
+--------+-----------+

最后,使用 agg() 聚合按 CATEGORY 分组的列 qty_on_hand功能-

df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+

关于python - PySpark Dataframe 将列融为行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55378047/

相关文章:

python - 比较文件夹内容

python - 向 Pandas DataFrame 添加一个新列,并使用来自单独 DataFrame 的编码数据而不使用循环?

apache-spark - PySpark:如何重新采样频率

python - 迭代目录中的文件并使用文件名作为变量,并将文件路径分配给变量

python - 使用 Cron 从其目录运行程序

python - 如何在 Python 中创建多个空列表?

python - search() 中的 Elasticsearch-py 无法识别 'analyzer' 参数

python - 修复 x 轴 matplotlib 的比例

python - 分组箱线图

python - 如何获取 Airflow dag 运行的 JobID?