python - 在 Python/Pyspark 中获取每月计数的更有效方法

标签 python sql apache-spark pyspark apache-spark-sql

我有一个表 DF,如下所示

ID   Days
1    30
2    55
3    32
4    12
5    100
.....

我想得到如下计数:

month                           count
30 days and greater             20,000
60 days and greater             15,323
90 days and greater             11,232
.....
3600 days and greater           55

我的代码非常简单明了,我只是为每个月应用过滤器,并获取计数,然后复制并粘贴到 Excel 中,如下所示:

month1 = df.filter("Days >= 30").agg(countDistinct('ID')).show() 
month2 = df.filter("Days>= 60").agg(countDistinct('ID')).show() 
month3 = df.filter("Days >= 90").agg(countDistinct('ID')).show() ....

这确实效率不高。

不知道有没有更简单的方法呢?并像这样创建一个表。

感谢先进!

最佳答案

您需要先将天数除以 30,然后对这些值进行分组:

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql import Row
from pyspark.sql.window import Window

df = spark.createDataFrame([
    Row(ID=1, Days=30),
    Row(ID=2, Days=55),
    Row(ID=3, Days=32),
    Row(ID=4, Days=12),
    Row(ID=5, Days=100),
    Row(ID=6, Days=3600)
])

# Calculating quantity of months
df = df.withColumn('total_months', f.floor(f.col('Days') / f.lit(30)))

# Group and count distinct
df = df.groupBy('total_months').agg(f.countDistinct('ID').alias('count'))

# Adding description
df = df.withColumn('month', f.concat(f.col('total_months') * f.lit(30), f.lit(' days and greater')))

# Cumulative sum
window = Window.orderBy(f.col('total_months').desc()).rangeBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn('count', f.sum('count').over(window))

# Selecting only required columns and sorting asc
(df
 .select('month', 'count')
 .sort('total_months')
 .show(truncate=False))

输出

+---------------------+-----+
|month                |count|
+---------------------+-----+
|0 days and greater   |6    |
|30 days and greater  |5    |
|90 days and greater  |2    |
|3600 days and greater|1    |
+---------------------+-----+

关于python - 在 Python/Pyspark 中获取每月计数的更有效方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67524851/

相关文章:

sql - SUM()/total rows 和 AVG() 在 sql server 之间返回不同的结果

sql - 如何优化/重构 TSQL "LIKE"子句?

python - 如何使用 2 个拆分参数拆分字符串?

python - Gensim 中的 TFIDIF 模型创建类型错误

mysql - 数据库设计 : force a child table to populate with predefined data when the parent record is created

apache-spark - WARN SparkContext:正在构造另一个SparkContext(或在其构造函数中引发了异常)

sql - Spark SQL分组: Add to group by or wrap in first() if you don't care which value you get.;

apache-spark - 在 Pyspark 中将月份名称转换为数字

python - Conda 和 Pycharm 不匹配

python - CountVectorizer 但对于文本组