scala - JSON 格式的 Spark DataFrame 列上的隐式模式发现

标签 scala apache-spark

我正在 Scala 阅读中编写 ETL Spark (2.4) 作业 ; - 在 S3 上使用 glob 模式分隔的 CSV 文件。数据加载到 DataFrame 中,并包含一个列(假设它名为 custom )和一个 JSON 格式的字符串( 多级嵌套 )。目标是自动从该列推断架构,以便可以在 S3 中为 Parquet 文件上的写接收器构造它。

这篇文章 ( How to query JSON data column using Spark DataFrames? ) 建议 schema_of_json从 Spark 2.4 开始,可以从 JSON 格式的列或字符串推断架构。

这是我尝试过的:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

但以上不起作用并引发此异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

请记住,我正在过滤掉 custom 上的空值对于这个数据帧。

编辑:整个代码如下。

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

以及一个 JSON 示例:
{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}

最佳答案

这是 custom 的指标不是 schema_of_json 的有效输入

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

您应该返回到您的数据并确保 custom确实是String .

关于scala - JSON 格式的 Spark DataFrame 列上的隐式模式发现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54688278/

相关文章:

scala - 如何将()注入(inject)到具有构造函数参数的类中?

apache-spark - Spark 结构化流与 ElasticSearch 集成

apache-spark - 过滤超前/滞后为特定值的行(带过滤器的窗口)

python - 为 IPython 创建 PySpark 配置文件

scala - 你如何在 scala 中编写模式匹配代码块?

arrays - Scala BigInt 数组

scala - 如何将 Array[Node] 转换为 NodeSeq?

scala - 如何模式匹配继承树中的抽象父类

apache-spark - 检查spark中矩阵每列中唯一值的数量

python - 拆分 RDD