scala - 如何遍历 Spark 中的模式?

标签 scala apache-spark

我想迭代 Spark 中的模式。使用 df.schema给出嵌套列表 StructTypeStructFields .

根元素可以像这样被索引。

IN: val temp = df.schema

IN: temp(0)
OUT: StructField(A,StringType,true)

IN: temp(3)
OUT: StructField(D,StructType(StructField(D1,StructType(StructField(D11,StringType,true), StructField(D12,StringType,true), StructField(D13,StringType,true)),true), StructField(D2,StringType,true), StructField(D3,StringType,true)),true)

当我尝试访问嵌套的 StructType 时,出现以下情况
IN: val temp1 = temp(3).dataType

IN: temp1(0)
OUT:
Name: Unknown Error
Message: <console>:38: error: org.apache.spark.sql.types.DataType does not take parameters
       temp1(0)
            ^
StackTrace: 

我不明白的是,这两个 temptemp1属于 StructType类,但是 temp是可迭代的,但 temp1不是。
IN: temp.getClass
OUT: class org.apache.spark.sql.types.StructType

IN: temp1.getClass
OUT: class org.apache.spark.sql.types.StructType

我也试过 dtypes但在尝试访问嵌套元素时遇到了类似的问题。
IN: df.dtypes(3)(0)
OUT:
Name: Unknown Error
Message: <console>:36: error: (String, String) does not take parameters
       df.dtypes(3)(0)
                   ^
StackTrace: 

那么,如何在知道子字段之前遍历模式?

最佳答案

好吧,如果您想要所有嵌套列的列表,您可以编写这样的递归函数

鉴于:

  val schema = StructType(
    StructField("name", StringType) ::
      StructField("nameSecond", StringType) ::
      StructField("nameDouble", StringType) ::
      StructField("someStruct", StructType(
        StructField("insideS", StringType) ::
          StructField("insideD", StructType(
            StructField("inside1", StringType) :: Nil
          )) ::
          Nil
      )) ::
      Nil
  )
  val rdd = session.sparkContext.emptyRDD[Row]
  val df = session.createDataFrame(rdd, schema)

 df.printSchema()

这将产生:
root
 |-- name: string (nullable = true)
 |-- nameSecond: string (nullable = true)
 |-- nameDouble: string (nullable = true)
 |-- someStruct: struct (nullable = true)
 |    |-- insideS: string (nullable = true)
 |    |-- insideD: struct (nullable = true)
 |    |    |-- inside1: string (nullable = true)

如果你想要列的全名列表,你可以这样写:
def fullFlattenSchema(schema: StructType): Seq[String] = {
  def helper(schema: StructType, prefix: String): Seq[String] = {
    val fullName: String => String = name => if (prefix.isEmpty) name else s"$prefix.$name"
    schema.fields.flatMap {
      case StructField(name, inner: StructType, _, _) =>
        fullName(name) +: helper(inner, fullName(name))
      case StructField(name, _, _, _) => Seq(fullName(name))
    }
  }

  helper(schema, "")
}

哪个将返回:
ArraySeq(name, nameSecond, nameDouble, someStruct, someStruct.insideS, someStruct.insideD, someStruct.insideD.inside1)

关于scala - 如何遍历 Spark 中的模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51374630/

相关文章:

scala - Play 2 Scala 使用多个枚举器提供 Websocket 输出 Iteratee(PatchPannel?)

scala - IntelliJ : Exception in thread "main" java. lang.NoClassDefFoundError: org/apache/spark/sql/types/DataType

scala - 如何确定在单个节点上运行的 Spark 的最佳设置?

json - 使用 Scala 中的列表解析递归 JSON 结构

scala - Jrebel/SBT如何在不重启的情况下重新编译webapp

scala - Scala 中的免费应用程序

scala - 在 sbt 项目中使用 Maven Central 的处理库时出现奇怪的错误

python - Spark jdbc.write 到 mysql 并出现 null 错误

apache-spark - Databricks Spark:java.lang.OutOfMemoryError:超出了GC开销限制i

apache-spark - 使用 Spark.SQL 编码的 Apache Spark 进行结构化流处理