java - 如何在java中展平spark数据集中的包装数组

标签 java apache-spark

使用 Spark 2.2 Java 1.8

我需要收集数组列的集合。但它给了我 WrappedArray。请参阅下文。

Dataset<Row> df2 = df.groupBy("id").agg(collect_list("values"))
df2.show(truncate=False)
# +-----+----------------------------------------------+ 
# |id|                         collect_list(values) | 
# +-----+----------------------------------------------+ 
# |1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| 
# |2    |[WrappedArray(2), WrappedArray(3)]            | 
# +-----+----------------------------------------------+

Expected output : = 

# +-----+------------------+
# |store|           values |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+

如何在 Spark java 中实现上述输出。有人可以帮忙吗?谢谢。

最佳答案

这是使用 UDF 的 scala 等效项(不是 java Guy):

//df.show(false)

+-----+----------------------------------------------+
|store|values                                        |
+-----+----------------------------------------------+
|1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
|2    |[WrappedArray(2), WrappedArray(3)]            |
+-----+----------------------------------------------+

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val flattenWrappedArray = udf((value:  Seq[Seq[Int]]) => {value.flatten})
df.withColumn("values_new",flattenWrappedArray($"values")).show(false)

输出:

+-----+----------------------------------------------+-------------+
|store|values                                        |values_new   |
+-----+----------------------------------------------+-------------+
|1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|[1,2,3,4,5,6]|
|2    |[WrappedArray(2), WrappedArray(3)]            |[2,3]        |
+-----+----------------------------------------------+-------------+    

希望这有帮助!

关于java - 如何在java中展平spark数据集中的包装数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52258050/

相关文章:

尝试使用 ReetrantLock(true) 锁定/解锁时 Java 线程锁

java - vector 值到数据库

java - 无法使该网格正确对齐

hadoop - 最小化Google Dataproc上Apache Spark作业的初始化时间的最佳方法是什么?

apache-spark - 如何使用具有时间段约束和其他条件的函数 PySpark 的窗口

scala - 负逻辑实现不适用于spark/scala

Java getDesktop() 打开命令

apache-spark - 如何从 SparkSession 获取作业或应用程序 ID?

apache-spark - PySpark 计数在 RDD 中按组区分

java - Spring @EnableResourceServer 与 @EnableOAuth2Sso