group-by - 使用用户定义函数聚合 Pyspark 数据帧

标签 group-by apache-spark-sql

如何将“groupby(key).agg(”与用户定义的函数一起使用?具体来说,我需要每个键的所有唯一值的列表[不计数]。

最佳答案

collect_setcollect_list(分别用于无序和有序结果)可用于后处理 groupby 结果。从一个简单的 Spark 数据框开始

    df = sqlContext.createDataFrame(
    [('first-neuron', 1, [0.0, 1.0, 2.0]), 
    ('first-neuron', 2, [1.0, 2.0, 3.0, 4.0])], 
    ("neuron_id", "time", "V"))

假设目标是返回每个神经元的 V 列表的最长长度(按名称分组)

    from pyspark.sql import functions as F
    grouped_df = tile_img_df.groupby('neuron_id').agg(F.collect_list('V'))

我们现在已将 V 列表分组为列表列表。因为我们想要最长的长度,所以我们可以跑

    import pyspark.sql.types as sq_types
    len_udf = F.udf(lambda v_list: int(np.max([len(v) in v_list])),
                      returnType = sq_types.IntegerType())
    max_len_df = grouped_df.withColumn('max_len',len_udf('collect_list(V)'))

获取添加了V列表最大长度的max_len列

关于group-by - 使用用户定义函数聚合 Pyspark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37283684/

相关文章:

python - 计算按特定列分组的项目之间的平均时间差

entity-framework - 如何在没有 if 语句(SUM/AVG)的情况下实现以下查询?

sql - 将根据 CASE 语句计算的两列相乘

java - Spark SQL RowFactory 返回空行

scala - 如果 SparkSession 没有关闭会发生什么?

pyspark - 通过对多列进行分组来用平均值填充缺失值

mysql - SQL 错误代码 1055。按组标记的最小日期

SQL 不同分组依据

scala - 如何使用 Spark DataFrames 和 Cassandra 设置命名策略

python - 如何将 RDD 的元素组合并收集到 pyspark 中的列表中