pyspark - 计算 SPARKSQL 中重复行的数量

标签 pyspark apache-spark-sql

我有一个要求,我需要计算 SparkSQL 中 Hive 表的重复行数。

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
app_name="test"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from  DV_BDFRAWZPH_NOGBD_R000_SG.employee")

到目前为止,我已经对表名称进行了硬编码,但它实际上是作为参数出现的。话虽这么说,我们也不知道列数或它们的名称。在 python pandas 中,我们有类似 df.duplicated.sum() 的东西来计算重复记录的数量。我们这里有这样的东西吗?

+---+---+---+
| 1 | A | B |
+---+---+---+
| 1 | A | B |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 2 | B | E |
+---+---+---+
| 3 | D | G |
+---+---+---+
| 4 | D | G |
+---+---+---+

此处重复行数为 4。(例如)

最佳答案

您实际上想要对所有列进行 groupBy()count(),然后选择计数大于 1 的行的计数总和。

import pyspark.sql.functions as f
df.groupBy(df.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

说明

分组和聚合后,您的数据将如下所示:

+---+---+---+---+
| 1 | A | B | 2 |
+---+---+---+---+
| 2 | B | E | 2 |
+---+---+---+---+
| 3 | D | G | 1 |
+---+---+---+---+
| 4 | D | G | 1 |
+---+---+---+---+

然后使用where()仅过滤计数大于1的行,并选择总和。在本例中,您将获得前 2 行,总和为 4。

关于pyspark - 计算 SPARKSQL 中重复行的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48554619/

相关文章:

amazon-web-services - Pyspark:如何检查s3中是否存在带有通配符的文件路径

apache-spark - 在 PySpark Structured Streaming 中对多个输出流使用单个流式 DataFrame

apache-spark - 如何降低数据框列名的大小写而不是其值?

apache-spark - Kubernetes 上 Apache Spark 结构化流上的长时间 GC 暂停

python - 如何通过对前一行和当前行的值求和来填充 PySpark Dataframe 的行?

python - PySpark - 连接到 s3 - 将文件读取到 rdd

python - 使用 pySpark 计算月末差异

python - 使用 PySpark 将复杂 RDD 转换为扁平化 RDD

azure - 如何根据azure databricks中的列值将静态值传递到动态值

apache-spark - 连接两个(非)配对的 RDD 来创建一个 DataFrame