我正在 Scala 阅读中编写 ETL Spark (2.4) 作业 ;
- 在 S3 上使用 glob 模式分隔的 CSV 文件。数据加载到 DataFrame 中,并包含一个列(假设它名为 custom
)和一个 JSON 格式的字符串( 多级嵌套 )。目标是自动从该列推断架构,以便可以在 S3 中为 Parquet 文件上的写接收器构造它。
这篇文章 ( How to query JSON data column using Spark DataFrames? ) 建议 schema_of_json
从 Spark 2.4 开始,可以从 JSON 格式的列或字符串推断架构。
这是我尝试过的:
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
但以上不起作用并引发此异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]
请记住,我正在过滤掉
custom
上的空值对于这个数据帧。编辑:整个代码如下。
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
/**
* RandomName entry point.
*
* @author Random author
*/
object RandomName {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("RandomName")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
.getOrCreate
import spark.implicits._
val randomName: RandomName = new RandomName(spark)
val df: sql.DataFrame = randomName.read().filter($"custom".isNotNull)
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
df.show
spark.stop
}
}
class RandomName(private val spark: SparkSession) {
/**
* Reads CSV files from S3 and creates a sql.DataFrame.
*
* @return a sql.DataFrame
*/
def read(): sql.DataFrame = {
val tableSchema = StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true),
StructField("c", DateType, true),
StructField("custom", StringType, true)
))
spark.read
.format("csv")
.option("sep", ";")
.option("header", "true")
.option("inferSchema", "true")
.schema(tableSchema)
.load("s3://random-bucket/*")
}
}
以及一个 JSON 示例:
{
"lvl1": {
"lvl2a": {
"lvl3a": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
},
"lvl2b": {
"lvl3a": {
"lvl4a": "ramdom_data"
},
"lvl3b": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
}
}
}
最佳答案
这是 custom
的指标不是 schema_of_json
的有效输入
scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...
您应该返回到您的数据并确保
custom
确实是String
.
关于scala - JSON 格式的 Spark DataFrame 列上的隐式模式发现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54688278/