我熟悉这种方法 - 例如 How to obtain the average of an array-type column in scala-spark over all row entries per entry? 中的一个例子
val array_size = 3
val avgAgg = for (i <- 0 to array_size -1) yield avg($"value".getItem(i))
df.select(array(avgAgg: _*).alias("avg_value")).show(false)
然而,3实际上是硬编码的。
无论我如何努力不使用 UDF,我都无法根据数据框中已存在的数组列的大小动态地执行此类操作。例如:
...
val z = for (i <- 1 to size($"sortedCol") ) yield array (element_at($"sortedCol._2", i), element_at($"sortedCol._3", i) )
...
...
.withColumn("Z", array(z: _*) )
我正在研究如何通过应用于长度可变的现有数组 col 来完成此操作。变换,表达式?不确定。
根据要求提供完整代码:
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
case class abc(year: Int, month: Int, item: String, quantity: Int)
val df0 = Seq(abc(2019, 1, "TV", 8),
abc(2019, 7, "AC", 10),
abc(2018, 1, "TV", 2),
abc(2018, 2, "AC", 3),
abc(2019, 2, "CO", 10)).toDS()
val df1 = df0.toDF()
// Gen some data, can be done easier, but not the point.
val itemsList= collect_list(struct("month", "item", "quantity"))
// This nn works.
val nn = 3
val z = for (i <- 1 to nn) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
// But want this.
//val z = for (i <- 1 to size($"sortedCol") ) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
val df2 = df1.groupBy($"year")
.agg(itemsList as "items")
.withColumn("sortedCol", sort_array($"items", asc = true))
.withColumn("S", size($"sortedCol")) // cannot use this either
.withColumn("Z", array(z: _*) )
.drop("items")
.orderBy($"year".desc)
df2.show(false)
// Col Z is the output I want, but not the null value Array
UPD
In apache spark SQL, how to remove the duplicate rows when using collect_list in window function?我用一个非常简单的 UDF 来解决,但我正在寻找一种没有 UDF 的方法,特别是在 for 循环
中动态设置 to value
。答案证明某些构造是不可能的——这是正在排序的验证。
最佳答案
如果我正确理解您的需求,您可以简单地使用 transform
函数,如下所示:
val df2 = df1.groupBy($"year")
.agg(itemsList as "items")
.withColumn("sortedCol", sort_array($"items", asc = true))
val transform_expr = "transform(sortedCol, x -> array(x.item, x.quantity))"
df2.withColumn("Z", expr(transform_expr)).show(false)
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|year|items |sortedCol |Z |
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|2018|[[1, TV, 2], [2, AC, 3]] |[[1, TV, 2], [2, AC, 3]] |[[TV, 2], [AC, 3]] |
//|2019|[[1, TV, 8], [7, AC, 10], [2, CO, 10]]|[[1, TV, 8], [2, CO, 10], [7, AC, 10]]|[[TV, 8], [CO, 10], [AC, 10]]|
//+----+--------------------------------------+--------------------------------------+-----------------------------+
关于arrays - 在 Spark Scala Dataframe 中迭代具有动态大小的数组列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61338482/