arrays - 在 Spark Scala Dataframe 中迭代具有动态大小的数组列

标签 arrays scala dataframe apache-spark

我熟悉这种方法 - 例如 How to obtain the average of an array-type column in scala-spark over all row entries per entry? 中的一个例子

val array_size = 3
val avgAgg = for (i <- 0 to array_size -1) yield avg($"value".getItem(i))
df.select(array(avgAgg: _*).alias("avg_value")).show(false)

然而,3实际上是硬编码的。

无论我如何努力不使用 UDF,我都无法根据数据框中已存在的数组列的大小动态地执行此类操作。例如:

...
val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol._2", i), element_at($"sortedCol._3", i) )
...
...
.withColumn("Z", array(z: _*)  )

我正在研究如何通过应用于长度可变的现有数组 col 来完成此操作。变换,表达式?不确定。

根据要求提供完整代码:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class abc(year: Int, month: Int, item: String, quantity: Int)

val df0 = Seq(abc(2019, 1, "TV", 8), 
              abc(2019, 7, "AC", 10),  
              abc(2018, 1, "TV", 2),  
              abc(2018, 2, "AC", 3), 
              abc(2019, 2, "CO", 10)).toDS()

val df1 = df0.toDF()
// Gen some data, can be done easier, but not the point.

val itemsList= collect_list(struct("month", "item", "quantity"))

// This nn works.
val nn = 3
val z =  for (i <- 1 to nn) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
// But want this.
//val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )


val df2 = df1.groupBy($"year")
   .agg(itemsList as "items")
   .withColumn("sortedCol", sort_array($"items", asc = true))  
   .withColumn("S", size($"sortedCol")) // cannot use this either
   .withColumn("Z", array(z: _*)  )
   .drop("items")
   .orderBy($"year".desc)
df2.show(false)
// Col Z is the output I want, but not the null value Array 

UPD

In apache spark SQL, how to remove the duplicate rows when using collect_list in window function?我用一个非常简单的 UDF 来解决,但我正在寻找一种没有 UDF 的方法,特别是在 for 循环 中动态设置 to value 。答案证明某些构造是不可能的——这是正在排序的验证。

最佳答案

如果我正确理解您的需求,您可以简单地使用 transform 函数,如下所示:

val df2 = df1.groupBy($"year")
             .agg(itemsList as "items")
             .withColumn("sortedCol", sort_array($"items", asc = true))


val transform_expr = "transform(sortedCol, x -> array(x.item, x.quantity))"

df2.withColumn("Z", expr(transform_expr)).show(false)

//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|year|items                                 |sortedCol                             |Z                            |
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|2018|[[1, TV, 2], [2, AC, 3]]              |[[1, TV, 2], [2, AC, 3]]              |[[TV, 2], [AC, 3]]           |
//|2019|[[1, TV, 8], [7, AC, 10], [2, CO, 10]]|[[1, TV, 8], [2, CO, 10], [7, AC, 10]]|[[TV, 8], [CO, 10], [AC, 10]]|
//+----+--------------------------------------+--------------------------------------+-----------------------------+

关于arrays - 在 Spark Scala Dataframe 中迭代具有动态大小的数组列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61338482/

相关文章:

python - 根据给定条件将新列添加到现有数据框列

c++ - 列表功能 C++

c - 求二维数组中每一列的平均值

c - 如何循环并打印 c 中的结构数组?

performance - 在哪种情况下,Scala 2.10.0 编译器可以比 2.9.2 更快或更慢?

python - 仅当其值不属于特定数据类型时才连接 3+ 列

javascript - JS : How to combine Object from two different arrays into one Array/Object

scala - 执行并获取包装在上下文中的函数的返回值?

ScalaTest AsyncFunSuiteLike 多个断言

python - Pandas 重新采样引发由日期时间对象组成的索引的类型错误