scala - 如何根据 spark 数据框中值的累计和为每一行分配一个类别?

标签 scala apache-spark bigdata cumulative-sum

我有一个 spark 数据框,其中包含两列 [Employee 和 Salary],其中薪水按升序排列。

示例数据框

Expected Output: 
| Employee |salary |
| -------- | ------|
|  Emp1    |  10   |
| Emp2     |  20   |
| Emp3     |  30   |
| EMp4     |  35   |
| Emp5     |  36   |
| Emp6     |  50   |
| Emp7     |  70   |

我想对行进行分组,使每个组的聚合值少于 80,并为每个组分配一个类别,如下所示。我会不断地逐行添加薪水,直到总和超过 80。一旦超过 80,我就会分配一个新类别。

Expected Output: 
| Employee |salary | Category|
| -------- | ------|----------
|  Emp1    |  10   |A        |
| Emp2     |  20   |A        |
| Emp3     |  30   |A        |
| EMp4     |  35   |B        |
| Emp5     |  36   |B        |
| Emp6     |  50   |C        |
| Emp7     |  70   |D        |

有没有我们可以在 spark scala 中执行此操作的简单方法?

最佳答案

要解决您的问题,您可以使用自定义 aggregate functionwindow

首先,您需要创建自定义聚合函数。聚合函数由累加器(缓冲区)定义,它将被初始化(值)并在处理新行时更新(reduce > 函数)或遇到另一个累加器(merge 函数)。最后,返回累加器(finish 函数)

在您的情况下,累加器应保留两条信息:

  • 当前员工类别
  • 属于当前类别的以前员工的工资总和

要存储这些信息,您可以使用元组 (Int, Int),第一个元素是当前类别,第二个元素是当前类别以前雇员的工资总和:

  • 您使用 (0, 0) 初始化此元组。
  • 当你遇到一个新行时,如果以前的薪水和当前行的薪水之和超过 80,你增加类别并用当前行的薪水重新初始化以前的薪水,否则你将当前行的薪水添加到以前的薪水'总和。
  • 由于您将使用窗口函数,因此您将按顺序处理行,因此您不需要实现与另一个累加器的合并。
  • 最后,由于您只需要类别,因此您只返回累加器的第一个元素。

因此我们得到以下聚合器实现:

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

object Labeler extends Aggregator[Int, (Int, Int), Int] {
  override def zero: (Int, Int) = (0, 0)

  override def reduce(catAndSum: (Int, Int), salary: Int): (Int, Int) = {
    if (catAndSum._2 + salary > 80)
      (catAndSum._1 + 1, salary)
    else
      (catAndSum._1, catAndSum._2 + salary)
  }

  override def merge(catAndSum1: (Int, Int), catAndSum2: (Int, Int)): (Int, Int) = {
    throw new NotImplementedError("should be used only over a windows function")
  }

  override def finish(catAndSum: (Int, Int)): Int = catAndSum._1

  override def bufferEncoder: Encoder[(Int, Int)] = Encoders.tuple(Encoders.scalaInt, Encoders.scalaInt)

  override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

一旦有了聚合器,就可以使用 udaf 函数将其转换为 spark 聚合函数。

然后您在所有数据框上创建您的窗口并按薪水排序,并在此窗口上应用您的 spark 聚合函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}

val labeler = udaf(Labeler)
val window = Window.orderBy("salary")

val result = dataframe.withColumn("category", labeler(col("salary")).over(window))

使用您的示例作为输入数据框,您将获得以下结果数据框:

+--------+------+--------+
|employee|salary|category|
+--------+------+--------+
|Emp1    |10    |0       |
|Emp2    |20    |0       |
|Emp3    |30    |0       |
|Emp4    |35    |1       |
|Emp5    |36    |1       |
|Emp6    |50    |2       |
|Emp7    |70    |3       |
+--------+------+--------+

关于scala - 如何根据 spark 数据框中值的累计和为每一行分配一个类别?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68799179/

相关文章:

list - 在一个函数中修改多个列表并在Scala中返回

Scala 单元测试 : how to validate the returned RDD

scala - 无法读取spark scala中的conf文件

python - 在python中批处理非常大的文本文件

java - 加载大文件时超出 GC 开销限制

hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?

scala - 如何将SBT默认日志级别设置为 "warn"?

scala - 即使列不在数据框中,Spark 也会下推过滤器

scala - foldLeft或foldRight等价于Spark吗?

scala - 如何从 Spark 数据框中删除与正则表达式匹配的行