python - 通过过滤对 Pyspark Dataframe 进行分组

标签 python apache-spark pyspark apache-spark-sql

我有如下数据框

cust_id   req    req_met
-------   ---    -------
 1         r1      1
 1         r2      0
 1         r2      1
 2         r1      1
 3         r1      1
 3         r2      1
 4         r1      0
 5         r1      1
 5         r2      0
 5         r1      1

我必须看看客户,看看他们有多少要求,看看他们是否至少满足过一次。同一客户和要求可以有多个记录,一个满足和不满足。在上述情况下,我的输出应该是

cust_id
-------
  1
  2
  3

我做的是

# say initial dataframe is df
df1 = df\
    .groupby('cust_id')\
    .countdistinct('req')\
    .alias('num_of_req')\
    .sum('req_met')\
    .alias('sum_req_met')

df2 = df1.filter(df1.num_of_req == df1.sum_req_met)

但在少数情况下它得不到正确的结果

如何做到这一点?

最佳答案

首先,我只准备上面给出的玩具数据集,

from pyspark.sql.functions import col
import pyspark.sql.functions as fn

df = spark.createDataFrame([[1, 'r1', 1],
 [1, 'r2', 0],
 [1, 'r2', 1],
 [2, 'r1', 1],
 [3, 'r1', 1],
 [3, 'r2', 1],
 [4, 'r1', 0],
 [5, 'r1', 1],
 [5, 'r2', 0],
 [5, 'r1', 1]], schema=['cust_id', 'req', 'req_met'])
df = df.withColumn('req_met', col("req_met").cast(IntegerType()))
df = df.withColumn('cust_id', col("cust_id").cast(IntegerType()))

我按 cust_idreq 分组做同样的事情,然后计算 req_met。之后,我创建了一个函数来将这些要求降低到 0、1

def floor_req(r):
    if r >= 1:
        return 1
    else:
        return 0
udf_floor_req = udf(floor_req, IntegerType())
gr = df.groupby(['cust_id', 'req'])
df_grouped = gr.agg(fn.sum(col('req_met')).alias('sum_req_met'))
df_grouped_floor = df_grouped.withColumn('sum_req_met', udf_floor_req('sum_req_met'))

现在,我们可以通过计算不同的要求数量和满足的要求总数来检查每个客户是否满足所有要求。

df_req = df_grouped_floor.groupby('cust_id').agg(fn.sum('sum_req_met').alias('sum_req'), 
                                                 fn.count('req').alias('n_req'))

最后,您只需检查两列是否相等:

df_req.filter(df_req['sum_req'] == df_req['n_req'])[['cust_id']].orderBy('cust_id').show()

关于python - 通过过滤对 Pyspark Dataframe 进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42826502/

相关文章:

python - Tkinter StringVar() 显示空白

hadoop - Apache Spark : The number of cores vs. 执行者数量

python - 使用 Visual Studio 将 PySpark 作业发送到 HDInsight 群集?

pyspark - 无法在 Google Colab 上安装 PySpark

python - Nose 跳过调试消息

python - 为特立独行的人编译 libtorrent Rasterbar

python - 在 Python 中屏蔽异常?

python - 用于二进制分类的 spark 逻辑回归 : apply new threshold for predicting 2 classes

apache-spark - 分区如何映射到 Spark 中的任务?

hadoop 作业与 pyspark 和 oozie 陷入僵局