apache-spark - 访问 Spark 中的嵌套数据

标签 apache-spark dataframe apache-spark-sql

我有一个嵌套案例类的集合。我的工作是使用这些案例类生成数据集,并将输出写入 parquet。

我非常恼火地发现我必须手动进行大量工作才能加载这些数据并将其转换回案例类,以便在后续作业中使用它。无论如何,这就是我现在正在尝试做的事情。

我的案例类如下:

case class Person(userId: String, tech: Option[Tech])
case class Tech(browsers: Seq[Browser], platforms: Seq[Platform])
case class Browser(family: String, version: Int)

所以我正在加载 Parquet 数据。我可以通过以下方式将 tech 数据作为 Row 获取:

val df = sqlContext.load("part-r-00716.gz.parquet")
val x = df.head
val tech = x.getStruct(x.fieldIndex("tech"))

但现在我找不到如何实际迭代浏览器。如果我尝试 val browsers = tech.getStruct(tech.fieldIndex("browsers")) 我会得到一个异常:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row

如何使用 Spark 1.5.2 迭代嵌套浏览器数据?

更新 事实上,我的案例类包含可选值,因此 Browser 实际上是:

case class Browser(family: String,
               major: Option[String] = None, 
               minor: Option[String] = None,
               patch: Option[String] = None, 
               language: String,
               timesSeen: Long = 1,
               firstSeenAt: Long,
               lastSeenAt: Long)

我也有类似的Os:

case class Os(family: String,
          major: Option[String] = None,
          minor: Option[String] = None,
          patch: Option[String] = None,
          patchMinor: Option[String],
          override val timesSeen: Long = 1,
          override val firstSeenAt: Long,
          override val lastSeenAt: Long)

所以Tech确实是:

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

现在,考虑到某些值是可选的,我需要一个解决方案来正确地重建我的案例类。当前的解决方案不支持 None 值,因此例如给定输入数据:

Tech(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en"), timesSeen=3),
    Browser(family=None, major=None, language=Some("en-us"), timesSeen=1),
    Browser(family=Some("Firefox), major=None, language=None, timesSeen=1)
  )
)

我需要它来加载数据,如下所示:

family=IE, major=7, language=en, timesSeen=3,
family=None, major=None, language=en-us, timesSeen=1,
family=Firefox, major=None, language=None, timesSeen=1

由于当前解决方案不支持 None 值,因此实际上每个列表项具有任意数量的值,即:

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
timesSeen = [3, 1, 1]

如您所见,无法将最终数据(由 Spark 返回)转换为生成它的案例类。

我该如何解决这种疯狂的问题?

最佳答案

一些例子

// Select two columns
df.select("userId", "tech.browsers").show()

// Select the nested values only
df.select("tech.browsers").show(truncate = false)
+-------------------------+
|browsers                 |
+-------------------------+
|[[Firefox,4], [Chrome,2]]|
|[[Firefox,4], [Chrome,2]]|
|[[IE,25]]                |
|[]                       |
|null                     |
+-------------------------+

// Extract the family (nested value)
// This way you can iterate over the persons, and get their browsers
// Family values are nested
df.select("tech.browsers.family").show()
+-----------------+
|           family|
+-----------------+
|[Firefox, Chrome]|
|[Firefox, Chrome]|
|             [IE]|
|               []|
|             null|
+-----------------+

// Normalize the family: One row for each family
// Then you can iterate over all families
// Family values are un-nested, empty values/null/None are handled by explode()
df.select(explode(col("tech.browsers.family")).alias("family")).show()
+-------+
| family|
+-------+
|Firefox|
| Chrome|
|Firefox|
| Chrome|
|     IE|
+-------+

基于上一个示例:

val families = df.select(explode(col("tech.browsers.family")))
  .map(r => r.getString(0)).distinct().collect().toList
println(families)

给出“正常”本地 Scala 列表中唯一的浏览器列表:

List(IE, Firefox, Chrome)

关于apache-spark - 访问 Spark 中的嵌套数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34043031/

相关文章:

java - 如何使用 Spark Cassandra 连接器插入 TimeUUID 和 TimeStamp?

apache-spark - 如何解释 Spark Stage UI 中的输入大小/记录

r - 当我拆分列表中的data.frame时,如何使输出更加优雅?

python - 比较 2 个 DataFrame 的半匹配行

sql - 将日光节约时间字符串转换为时间戳会给出错误的结果

python - 如何优化将Spark数据帧的每一行写为单独的文件

scala - 从Serializable Scala对象调用Hadoop FileSystem操作的最佳方法是什么

python - 将一个数据帧除以另一个数据帧的问题

java - Spark Group By Key to (String, Iterable<String>)

python - 如何有效地为数据框的列名称添加前缀,而无需在 Pyspark 中创建新的数据框?