假设我有一个用例,我想 groupBy
然后将自定义函数应用于分组值。在 Python 中,我可以通过以下方式完成此操作:
df.groupby("id").apply(custom_function)
和
@pandas_udf("id string, prediction double", PandasUDFType.GROUPED_MAP)
def custom_function(id, dataframe):
rf = RandomForestRegressor(n_estimators=25, random_state=42)
rf.fit(train_features, dataframe.quantity_sold)
prediction = rf.predict(test_features)
return pd.DataFrame({'id': id, 'prediction': prediction}, index=[0])
我可以通过以下方式在 Scala 中完成同样的事情:
input.rdd.groupBy(row => row.get(0)).collect().map(data => {
val df = sparkSession.createDataFrame(sparkContext.parallelize(data._2.toSeq), input.schema)
(data._1.toString, df)
}).foldLeft(sparkSession.createDataFrame(sparkContext.emptyRDD[Row], outputSchema))((acc, next) => {
val assembler = new VectorAssembler()
.setInputCols(modelColumns)
.setOutputCol(features)
.transform(next._2)
val forest = oldForest
.fit(assembler)
.transform(testAssembler)
acc.union(forest)
})
如果我们比较这两种解决方法,上面的方法比下面的方法快得多。我尝试在没有 collect
的情况下执行此操作,但我收到错误消息 RDD transformations and actions can only be invoked by the driver, not inside other transformations
。
我知道 collect
将结果作为列表返回给驱动程序,这就是为什么我被迫使用 Scala 集合 API(map
和 flatMap
) 以进一步继续我的处理。
我的问题是,一旦收集到驱动程序,作业是否不应再次传播到执行程序(因为我继续使用 Spark ML API)?或者当代码返回到 main
方法执行的位置时,驱动程序中的所有内容都简单地计算(一旦收集)?基本上,为什么运行速度很慢,有没有什么方法可以在不使用 Python 的情况下改善这个过程?
谢谢!
编辑:设法解决了这个问题(示例如下);假设我们有这个数据集:
+---+-----+------+-----+
|id |first|second|third|
+---+-----+------+-----+
|1 |1.0 |1.0 |1.0 |
|1 |1.0 |2.0 |2.0 |
|1 |1.0 |3.0 |3.0 |
|1 |1.0 |4.0 |4.0 |
|1 |1.0 |5.0 |5.0 |
+---+-----+------+-----+
我们的目标是按 id
分组,然后对分组的列(first
、second
和 third
),我们想训练模型然后预测一些东西(third
列是我们的标签)。
分组并应用 UDAF(如 werner 所建议):
val myAggFct = udaf(MyAgg).apply(array("first", "second", "third"))
df.groupBy("id").agg(myAggFct)
myAggFct
UDF 实现如下:
object MyAgg extends Aggregator[Seq[Double], Seq[Seq[Double]], String] {
override def zero: Seq[Seq[Double]] = scala.collection.mutable.Seq[Seq[Double]]()
override def reduce(b: Seq[Seq[Double]], a: Seq[Double]): Seq[Seq[Double]] = b :+ a
override def merge(b1: Seq[Seq[Double]], b2: Seq[Seq[Double]]): Seq[Seq[Double]] = b1 ++ b2
override def finish(allInts: Seq[Seq[Double]]): String = {
// Defining the attributes (first, second and third, as our dataset)
val array: List[Attribute] = List() :+
new Attribute("first") :+
new Attribute("second") :+
new Attribute("third")
// Creation of the instance and defining what we want to predict, in our case, the last attribute
// aka. `third`
val dataRaw = new Instances("train", new util.ArrayList[Attribute](array.asJava), 0)
dataRaw.setClassIndex(dataRaw.numAttributes() - 1)
// Converting our Seq[Seq[Double]] to Dense Instances, so we can add it to `dataRaw`
// aka. our trained model
dataRaw.addAll(allInts.map(v => new DenseInstance(1.0, v.toArray)).asJava)
// We create a Random Forest object and we use `dataRaw` as classifier
val mlp = new RandomForest()
mlp.buildClassifier(dataRaw)
// Give it a test case, in this case, we want to see where first = 1.0 and second = 2.0 fall into
val testInstance = new DenseInstance(1.0, Seq(1.0, 2.0).toArray)
testInstance.setDataset(dataRaw)
// We classify the instance and add some content for clearer output
mlp.classifyInstance(testInstance).toString + ": " + allInts.mkString(", ")
}
override def bufferEncoder: Encoder[Seq[Seq[Double]]] = newSequenceEncoder[Seq[Seq[Double]]]
override def outputEncoder: Encoder[String] = Encoders.STRING
}
最终结果:
+---+------------------------------------------------------------------------------------------------------------+
|id |myagg$(array(first, second, third)) |
+---+------------------------------------------------------------------------------------------------------------+
|1 |2.2: List(1.0, 1.0, 1.0), List(1.0, 2.0, 2.0), List(1.0, 3.0, 3.0), List(1.0, 4.0, 4.0), List(1.0, 5.0, 5.0)|
+---+------------------------------------------------------------------------------------------------------------+
在这种情况下,在从/向 Java 和 Scala 转换时可能会产生一些开销。
最佳答案
这是 User-Defined Aggregate Function 的一个很好的用例.
什么是用户定义的聚合函数?
用 groupBy 对数据框进行分组后通常是一个或多个聚合函数,如 min , max或 sum用于将属于一组行的所有值聚合为单个值。如果 Spark 的内置函数都不适合您的需要,您可以编写自己的函数,从其中一个组中获取数据并将其聚合为一个新值。
喜欢就可以用
df.groupBy('myCol1).agg(sum('myCol2))
你可以使用
df.groupBy('myCol1).agg(customFunction('myCol2))
customFunction
执行您需要它执行的任何操作,例如将 RandomForestRegressor 应用于一组数据的所有元素。
如何创建用户定义的聚合函数?
这是一个(可以说是简单的)用户定义聚合函数的示例。该函数收集一个序列中一组的所有值,然后将所有这些值连接成一个字符串。
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._
//some test data: 1,2,3,...,10
val df = (1 to 10).toDF()
//create the user defined aggregation function
object MyAgg extends Aggregator[Int, Seq[Int], String]{
override def zero: Seq[Int] = scala.collection.mutable.Seq[Int]()
override def reduce(b: Seq[Int], a: Int): Seq[Int] = b :+ a
override def merge(b1: Seq[Int], b2: Seq[Int]): Seq[Int] = b1 ++ b2
override def finish(allInts: Seq[Int]): String = allInts.foldLeft("START")((s,b) => s + "_" + b)
override def bufferEncoder: Encoder[Seq[Int]] = newSequenceEncoder[Seq[Int]]
override def outputEncoder: Encoder[String] = Encoders.STRING
}
val myAggFct = udaf(MyAgg).withName("myAgg")
//group the dataframe and apply myAggFct to each group separately
df.groupBy(expr("value % 3")).agg(myAggFct('value)).show
输出:
+-----------+--------------+
|(value % 3)| myagg(value)|
+-----------+--------------+
| 1|START_1_4_7_10|
| 2| START_2_5_8|
| 0| START_3_6_9|
+-----------+--------------+
用户定义的聚合函数如何工作?
reduce
和 merge
这两个函数将一个组的所有值组合到一个由 zero
函数创建的序列中。
中心函数是函数finish
。这里所有收集到的值的序列 (allInts
) 被转换为聚合操作的结果。这将是应用例如 RandomForestRegressor 的地方。由于 finish
函数在执行程序节点上分布式运行,所有需要的附加数据应该是 broadcasted .
注意:上面的示例也可以(更好地)使用 Dataset.reduce 来实现因为我们不需要值作为序列。我们可以在看到值时立即将它们添加到字符串中。但对于回归器,我们需要完整的值列表,因此用户定义的聚合函数在这里是合理的。
关于apache-spark - 通过在 Java 流中使用 SparkSession 在 Spark 中应用自定义函数进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73523124/