假设我通过以下方式将 json 文件加载到 Spark 1.6 中
sqlContext.read.json("/hdfs/")
它给了我一个具有以下架构的数据框:
root
|-- id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- checked: array (nullable = true)
| |-- element: string (containsNull = true)
|-- color: array (nullable = true)
| |-- element: string (containsNull = true)
|-- type: array (nullable = true)
| |-- element: string (containsNull = true)
DF 只有一行,里面有我所有项目的数组。
+--------------------+--------------------+--------------------+
| id_e| checked_e| color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+
我想要一个 DF,其中数组分解为每行一个项目。
+--------------------+-----+-------+
| id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
...
到目前为止,我通过从数组 DF 创建一个临时表并在这些行上使用横向 View explode 的 sql 来实现这一点。
val results = sqlContext.sql("
SELECT id, color, checked from temptable
lateral view explode(checked_e) temptable as checked
lateral view explode(id_e) temptable as id
lateral view explode(color_e) temptable as color
")
有没有办法直接使用 Dataframe 函数而不使用 SQL 来实现这一点?我知道有类似 df.explode(...) 的东西,但我无法让它与我的数据一起使用
编辑:看来 explode 并不是我真正想要的。 我想要一个新的数据框,其中逐行包含数组的每个项目。 explode 函数实际上返回的行数比我的初始数据集多。
最佳答案
以下解决方案应该有效。
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val df = sqlContext.createDataFrame(data)
val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
case (a, b, c) => (a,b, c).zipped.toSeq
}
val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
val exploded = df3.select(explode($"udf3").alias("col3"))
exploded.withColumn("first", $"col3".getItem("_1"))
.withColumn("second", $"col3".getItem("_2"))
.withColumn("third", $"col3".getItem("_3")).show
如果直接使用普通的 Scala 代码会更简单。它也可能更有效。如果只有一行,Spark 无论如何也帮不上忙。
val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val seqExploded = data.flatMap{
case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
}
val dfTheSame=sqlContext.createDataFrame(seqExploded)
dfTheSame.show
关于arrays - Spark 在 Scala 中用数组分解嵌套 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38243717/