apache-spark - 将聚合列添加到 Spark DataFrame

标签 apache-spark apache-spark-sql pyspark

我有一个 Spark DataFrame,如下所示:

| id | value | bin |
|----+-------+-----|
|  1 |   3.4 |   2 |
|  2 |   2.6 |   1 |
|  3 |   1.8 |   1 |
|  4 |   9.6 |   2 |

我有一个函数f它接受一个值数组并返回一个数字。我想向上面的数据框添加一列,其中每行中新列的值是 f 的值对于所有 value具有相同 bin 的条目条目,即:

| id | value | bin | f_value       |
|----+-------+-----+---------------|
|  1 |   3.4 |   2 | f([3.4, 9.6]) |
|  2 |   2.6 |   1 | f([2.6, 1.8]) |
|  3 |   1.8 |   1 | f([2.6, 1.8]) |
|  4 |   9.6 |   2 | f([3.4, 9.6]) |

因为我需要聚合所有 valuebin ,我无法使用 withColumn函数来添加这个新列。在用户定义的聚合函数进入 Spark 之前,最好的方法是什么?

最佳答案

下面的代码未经测试,只是一个想法。

在 Hive 中,可以使用 collect_list 来完成此操作功能。

val newDF = sqlContext.sql(
    "select bin, collect_list() from aboveDF group by bin")

下一步加入 bin上的aboveDFnewDF

这是您要找的吗?

关于apache-spark - 将聚合列添加到 Spark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30510534/

相关文章:

java - 尽管mapWithState中的元素相同,但为什么所有元素都被打印

scala - 应用程序中的 Spark 调度 : performance issue

linux - 由于转义字符导致找不到文件错误

apache-spark - 如何在 pyspark 数据帧读取方法中包含分区列

apache-spark - pyspark 如何有效地进行这种转换?

apache-spark - 为什么spark执行器会收到SIGTERM?

dictionary - Spark mapPartitions 与 transient 惰性 val

scala - 将 Spark DataFrame 保存到具有 map<string,string> 列类型的 csv 文件

python - 为什么对 rand() 生成的列进行操作的 PySpark UDF 会失败?

python - PySpark:不使用循环将 DataFrame 拆分为多个 DataFrame