scala - 在 Spark 中使用自定义函数聚合多列

标签 scala apache-spark dataframe apache-spark-sql orc

我想知道是否有某种方法可以为多列的 Spark 数据帧指定自定义聚合函数。

我有一个这样类型的表(名称、项目、价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

到:

我想将每个人的项目和成本汇总到这样的列表中:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

这在数据帧中可能吗?我最近了解到 collect_list但它似乎只适用于一列。

最佳答案

执行此操作的最简单方法 DataFrame就是先收集两个list,然后用一个UDFzip将两个列表放在一起。就像是:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+

关于scala - 在 Spark 中使用自定义函数聚合多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37737843/

相关文章:

scala - 在 Scala Repl 中暂时禁用制表符补全

scala - 将自动递增值添加到 scala 映射以获取空值

apache-spark - 如何将 H2OFrame 中的列转换为 python 列表?

python - 如何从 Pandas DataFrame 中提取值,而不是 Series(不引用索引)?

python - 将一列交换为一行

scala - 如何将自定义日期时间格式转换为时间戳?

scala - 如何正确使用 scalac -Xlint

python - pyspark 多列条件并返回新列

hadoop - Apache Spark:无法构建:[错误]服务器访问错误..jetty

arrays - 在 Julia 中将数组转换为 DataFrame 或保存为 CSV