scala - 如何从 Spark SQL DataFrame 中的 MapType 列获取键和值

标签 scala apache-spark dataframe apache-spark-sql apache-spark-dataset

我的 parquet 文件中有数据,该文件有 2 个字段:object_id: Stringalpha: Map<> .

它被读入 SparkSQL 中的数据帧,其架构如下所示:

scala> alphaDF.printSchema()
root
 |-- object_id: string (nullable = true)
 |-- ALPHA: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)

我正在使用 Spark 2.0,我正在尝试创建一个新的数据框架,其中的列需要为 object_id加号 ALPHA map 如object_id, key1, key2, key2, ...

我首先尝试看看我是否至少可以像这样访问 map :

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   alphaDF.map(a => a(0)).collect()

但不幸的是我似乎无法弄清楚如何访问 map 的键。

有人可以告诉我一种获取 object_id 的方法吗?添加映射键作为列名称和映射值作为新数据框中的相应值?

最佳答案

Spark >= 2.3

您可以使用map_keys函数简化该过程:

import org.apache.spark.sql.functions.map_keys

还有map_values函数,但在这里不会直接有用。

Spark <2.3

一般方法可以用几个步骤来表达。首先需要导入:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

和示例数据:

val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")

要提取 key ,我们可以使用 UDF (Spark < 2.3)

val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)

或内置函数

import org.apache.spark.sql.functions.map_keys

val keysDF = df.select(map_keys($"alpha"))

找到不同的:

val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
  .collect.sorted

您还可以使用explode概括keys提取:

import org.apache.spark.sql.functions.explode

val distinctKeys = df
  // Flatten the column into key, value columns
 .select(explode($"alpha"))
 .select($"key")
 .as[String].distinct
 .collect.sorted

选择:

ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)

关于scala - 如何从 Spark SQL DataFrame 中的 MapType 列获取键和值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40602606/

相关文章:

java - 如何在 Play2 scala 模板中设置所选选项?

Scala编译问题 "forward reference extends over definition of value"

scala - 如何将 Option 与 Spark UDF 结合使用

python - PySpark 如何读取具有多种编码字符串的文件

java - 如何在 java 中访问 WrappedArray 的 scala.collection.mutable.WrappedArray 中的值

python - 从 Pandas 数据框中删除不需要的值

python - 根据对 pandas 数据框列的标签引用选择下一列

scala - 远程创建 Akka Actor,无需新的 ActorSystem

join - 使用非等键的自定义连接

Python Pandas - 数据透视表输出意外 float