apache-spark - 通过在 Java 流中使用 SparkSession 在 Spark 中应用自定义函数进行分组?

标签 apache-spark apache-spark-sql

假设我有一个用例,我想 groupBy 然后将自定义函数应用于分组值。在 Python 中,我可以通过以下方式完成此操作:

df.groupby("id").apply(custom_function)

@pandas_udf("id string, prediction double", PandasUDFType.GROUPED_MAP)
def custom_function(id, dataframe):
    rf = RandomForestRegressor(n_estimators=25, random_state=42)

    rf.fit(train_features, dataframe.quantity_sold)

    prediction = rf.predict(test_features)

    return pd.DataFrame({'id': id, 'prediction': prediction}, index=[0])

我可以通过以下方式在 Scala 中完成同样的事情:

input.rdd.groupBy(row => row.get(0)).collect().map(data => {
            val df = sparkSession.createDataFrame(sparkContext.parallelize(data._2.toSeq), input.schema)
 
            (data._1.toString, df)
        }).foldLeft(sparkSession.createDataFrame(sparkContext.emptyRDD[Row], outputSchema))((acc, next) => {
            val assembler = new VectorAssembler()
                .setInputCols(modelColumns)
                .setOutputCol(features)
                .transform(next._2)
 
            val forest = oldForest
                .fit(assembler)
                .transform(testAssembler)
 
            acc.union(forest)
        })

如果我们比较这两种解决方法,上面的方法比下面的方法快得多。我尝试在没有 collect 的情况下执行此操作,但我收到错误消息 RDD transformations and actions can only be invoked by the driver, not inside other transformations

我知道 collect 将结果作为列表返回给驱动程序,这就是为什么我被迫使用 Scala 集合 API(mapflatMap ) 以进一步继续我的处理。

我的问题是,一旦收集到驱动程序,作业是否不应再次传播到执行程序(因为我继续使用 Spark ML API)?或者当代码返回到 main 方法执行的位置时,驱动程序中的所有内容都简单地计算(一旦收集)?基本上,为什么运行速度很慢,有没有什么方法可以在不使用 Python 的情况下改善这个过程?

谢谢!


编辑:设法解决了这个问题(示例如下);假设我们有这个数据集:

+---+-----+------+-----+
|id |first|second|third|
+---+-----+------+-----+
|1  |1.0  |1.0   |1.0  |
|1  |1.0  |2.0   |2.0  |
|1  |1.0  |3.0   |3.0  |
|1  |1.0  |4.0   |4.0  |
|1  |1.0  |5.0   |5.0  |
+---+-----+------+-----+

我们的目标是按 id 分组,然后对分组的列(firstsecondthird ),我们想训练模型然后预测一些东西(third 列是我们的标签)。

分组并应用 UDAF(如 werner 所建议):

val myAggFct = udaf(MyAgg).apply(array("first", "second", "third"))

df.groupBy("id").agg(myAggFct)

myAggFct UDF 实现如下:

object MyAgg extends Aggregator[Seq[Double], Seq[Seq[Double]], String] {
  override def zero: Seq[Seq[Double]] = scala.collection.mutable.Seq[Seq[Double]]()

  override def reduce(b: Seq[Seq[Double]], a: Seq[Double]): Seq[Seq[Double]] = b :+ a

  override def merge(b1: Seq[Seq[Double]], b2: Seq[Seq[Double]]): Seq[Seq[Double]] = b1 ++ b2

  override def finish(allInts: Seq[Seq[Double]]): String = {

    // Defining the attributes (first, second and third, as our dataset)
    val array: List[Attribute] = List() :+
      new Attribute("first") :+
      new Attribute("second") :+
      new Attribute("third")

    // Creation of the instance and defining what we want to predict, in our case, the last attribute
    // aka. `third`
    val dataRaw = new Instances("train", new util.ArrayList[Attribute](array.asJava), 0)
    dataRaw.setClassIndex(dataRaw.numAttributes() - 1)

    // Converting our Seq[Seq[Double]] to Dense Instances, so we can add it to `dataRaw`
    // aka. our trained model
    dataRaw.addAll(allInts.map(v => new DenseInstance(1.0, v.toArray)).asJava)

    // We create a Random Forest object and we use `dataRaw` as classifier
    val mlp = new RandomForest()
    mlp.buildClassifier(dataRaw)

    // Give it a test case, in this case, we want to see where first = 1.0 and second = 2.0 fall into
    val testInstance = new DenseInstance(1.0, Seq(1.0, 2.0).toArray)
    testInstance.setDataset(dataRaw)

    // We classify the instance and add some content for clearer output
    mlp.classifyInstance(testInstance).toString + ": " + allInts.mkString(", ")
  }

  override def bufferEncoder: Encoder[Seq[Seq[Double]]] = newSequenceEncoder[Seq[Seq[Double]]]

  override def outputEncoder: Encoder[String] = Encoders.STRING
}

最终结果:

+---+------------------------------------------------------------------------------------------------------------+
|id |myagg$(array(first, second, third))                                                                         |
+---+------------------------------------------------------------------------------------------------------------+
|1  |2.2: List(1.0, 1.0, 1.0), List(1.0, 2.0, 2.0), List(1.0, 3.0, 3.0), List(1.0, 4.0, 4.0), List(1.0, 5.0, 5.0)|
+---+------------------------------------------------------------------------------------------------------------+

在这种情况下,在从/向 Java 和 Scala 转换时可能会产生一些开销。

最佳答案

这是 User-Defined Aggregate Function 的一个很好的用例.

什么是用户定义的聚合函数?

groupBy 对数据框进行分组后通常是一个或多个聚合函数,如 min , maxsum用于将属于一组行的所有值聚合为单个值。如果 Spark 的内置函数都不适合您的需要,您可以编写自己的函数,从其中一个组中获取数据并将其聚合为一个新值。

喜欢就可以用

df.groupBy('myCol1).agg(sum('myCol2))

你可以使用

df.groupBy('myCol1).agg(customFunction('myCol2))

customFunction 执行您需要它执行的任何操作,例如将 RandomForestRegressor 应用于一组数据的所有元素。

如何创建用户定义的聚合函数?

这是一个(可以说是简单的)用户定义聚合函数的示例。该函数收集一个序列中一组的所有值,然后将所有这些值连接成一个字符串。

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._

//some test data: 1,2,3,...,10
val df = (1 to 10).toDF()

//create the user defined aggregation function
object MyAgg extends Aggregator[Int, Seq[Int], String]{
  override def zero: Seq[Int] = scala.collection.mutable.Seq[Int]()

  override def reduce(b: Seq[Int], a: Int): Seq[Int] = b :+ a

  override def merge(b1: Seq[Int], b2: Seq[Int]): Seq[Int] = b1 ++ b2

  override def finish(allInts: Seq[Int]): String = allInts.foldLeft("START")((s,b) => s + "_" + b)

  override def bufferEncoder: Encoder[Seq[Int]] = newSequenceEncoder[Seq[Int]]

  override def outputEncoder: Encoder[String] = Encoders.STRING
}
val myAggFct = udaf(MyAgg).withName("myAgg")

//group the dataframe and apply myAggFct to each group separately
df.groupBy(expr("value % 3")).agg(myAggFct('value)).show

输出:

+-----------+--------------+
|(value % 3)|  myagg(value)|
+-----------+--------------+
|          1|START_1_4_7_10|
|          2|   START_2_5_8|
|          0|   START_3_6_9|
+-----------+--------------+

用户定义的聚合函数如何工作?

reducemerge 这两个函数将一个组的所有值组合到一个由 zero 函数创建的序列中。

中心函数是函数finish。这里所有收集到的值的序列 (allInts) 被转换为聚合操作的结果。这将是应用例如 RandomForestRegressor 的地方。由于 finish 函数在执行程序节点上分布式运行,所有需要的附加数据应该是 broadcasted .

注意:上面的示例也可以(更好地)使用 Dataset.reduce 来实现因为我们不需要值作为序列。我们可以在看到值时立即将它们添加到字符串中。但对于回归器,我们需要完整的值列表,因此用户定义的聚合函数在这里是合理的。

关于apache-spark - 通过在 Java 流中使用 SparkSession 在 Spark 中应用自定义函数进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73523124/

相关文章:

apache-spark - 如何在 Google Dataproc 主节点上启用 pyspark HIVE 支持

python - PySpark groupByKey 返回 pyspark.resultiterable.ResultIterable

sql - Spark SQL : Cache Memory footprint improves with 'order by'

apache-spark - Spark 数据集 createOrReplaceTempView ViewName 限制

python - 当列表值与 Pyspark 数据框中列值的子字符串匹配时填充新列

apache-spark - 使用 Spark 生成拼花数据文件以测试 Hive/Presto/Drill/等的快速方法是什么?

apache-spark - 使用 Spark.sql 插入 TempView

scala - org.apache.spark.sql.AnalysisException:无法解析给定的输入列

apache-spark - 在 Spark SQL 中按多列进行分区

apache-spark - 如何在没有 hive-site.xml 的情况下将 Spark SQL 连接到远程 Hive 元存储(通过节俭协议(protocol))?