apache-spark - 将组计数列添加到 PySpark 数据帧

标签 apache-spark pyspark dplyr

我来自 R 和 tidyverse到 PySpark,因为它具有出色的 Spark 处理能力,我正在努力将某些概念从一个上下文映射到另一个上下文。

特别是,假设我有一个如下所示的数据集

x | y
--+--
a | 5
a | 8
a | 7
b | 1

我想添加一列,其中包含每个 x 的行数值(value),像这样:
x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1

在 dplyr 中,我只想说:

import(tidyverse)

df <- read_csv("...")
df %>%
    group_by(x) %>%
    mutate(n = n()) %>%
    ungroup()

就是这样。如果我正在寻找 ,我可以在 PySpark 中做一些几乎同样简单的事情总结 按行数:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .count() \
    .show()

我以为我明白 withColumn相当于 dplyr 的 mutate .但是,当我执行以下操作时,PySpark 告诉我 withColumn未为 groupBy 定义数据:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .withColumn("n", count("x")) \
    .show()

在短期内,我可以简单地创建包含计数的第二个数据帧并将其连接到原始数据帧。但是,在大表的情况下,这似乎会变得效率低下。实现此目的的规范方法是什么?

最佳答案

当你做 groupBy() ,您必须先指定聚合,然后才能显示结果。例如:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
]
df = sqlCtx.createDataFrame(data, ["x", "y"])
df.groupBy('x').count().select('x', f.col('count').alias('n')).show()
#+---+---+
#|  x|  n|
#+---+---+
#|  b|  1|
#|  a|  3|
#+---+---+

这里我用了 alias()重命名列。但这每组只返回一行。如果您希望所有行都附加计数,则可以使用 Window 来执行此操作。 :

from pyspark.sql import Window
w = Window.partitionBy('x')
df.select('x', 'y', f.count('x').over(w).alias('n')).sort('x', 'y').show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

或者,如果您更熟悉 SQL,则可以将数据框注册为临时表并利用 pyspark-sql做同样的事情:

df.registerTempTable('table')
sqlCtx.sql(
    'SELECT x, y, COUNT(x) OVER (PARTITION BY x) AS n FROM table ORDER BY x, y'
).show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

关于apache-spark - 将组计数列添加到 PySpark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48793701/

相关文章:

apache-spark - IllegalArgumentException : Column must be of type struct<type:tinyint,大小:int,索引:array<int>,值:array<double>> but was actually double.'

arrays - 使用数组值合并两个 Spark 数据帧

R:避免使用 for 循环顺序选择一列中的值并使用另一列中的值向量应用函数

hadoop - 运行我的 spark 作业时,出现此错误 : id: <app-name>: No such user

shell - 在 yarn 客户端错误上运行 spark shell

scala - 根据 Spark 中的特定值提取行

apache-spark - Spark查询运行很慢

python - PySpark 窗口不适用于指定的整数范围

r - 使用 multidplyr 在 dplyr::do 中调用带参数的函数

r - dplyr:根据列的匹配字符串改变列