scala - 如何使用我的相等比较器对 Spark DataFrame 进行 GroupBy?

标签 scala sorting apache-spark apache-spark-sql

我想在 DataFrame 上使用 GroupBy 运算符和我自己的相等比较器。

假设我想执行类似的操作:

df.groupBy("Year","Month").sum("Counter")

在此数据框中:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

我必须实现两个比较器:

1) 对于年份列:p.e. “2012”==“12”

2) 对于月份列:p.e. “一月”==“一月”==“一月”

假设我已经实现了这两个比较器。我怎样才能调用它们?如 this例如,我已经知道我必须将 DataFrame 转换为 RDD 才能使用我的比较器。

我考虑过使用RDD GroupBy .

请注意,我确实需要使用比较器来完成此操作。我无法使用 UDF、更改数据或创建新列。 future 的想法是拥有密文列,其中我有函数可以让我比较两个密文是否相同。我想在我的比较器中使用它们。

编辑:

此刻,我尝试仅用一列来完成此操作,例如:

df.groupBy("Year").sum("Counter")

我有一个包装类:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

然后,我正在这样做:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

我的问题是如何进行“求和”,以及如何将 keyBy 与多个列一起使用以使用 ExampleWrapperYear 和 ExampleWrapperMonth。

最佳答案

这个解决方案应该可行。这里是实现 hashCode 和 equals 的案例类(我们可以将它们称为比较器)。

可以根据不同的密文修改/更新hashCode和equals

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

这是分组键的重要比较器,它仅使用单独的 col 比较器

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

这给出了

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

请注意,对于案例类“年”和“月”,还将值更新为标准值(否则无法预测它选择哪个值)。

关于scala - 如何使用我的相等比较器对 Spark DataFrame 进行 GroupBy?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55147029/

相关文章:

apache-spark - 在 Spark 中按时间戳分区 Parquet 文件的最佳做法是什么?

Scala IDE 警告 : "anonymous function convertible to method value"

scala - Elastic Search中的文档删除需要花费时间

Scala:根据另一个有条件地执行一个 Future 的最惯用方式?

sorting - 如何使用无痛脚本语言从列表列表中区分行?

sorting - 内部查询中单个查询的 Elasticsearch 排序结果

java - 对整数数组的 ArrayList 进行排序

python - 当我尝试启动 PySpark 时出现空指针异常

java - 从kafka消息中获取主题

scala - SBT:如何获取IntegrationTest中的所有定义?