Spark 非常擅长在初次从磁盘读取数据时将 JSON 解析为嵌套的 StructType
,但是如果我在 中已经有一个包含 JSON 的
,我想将其映射到带有 String
列怎么办?数据集StructType
列的 Dataset
中,并使用考虑整个数据集的模式推断,同时充分利用并行状态并避免减少行动?
我知道函数 schema_of_json
和 from_json
,它们显然是为了一起使用来完成此任务或类似的任务,但我无法找到实际的工作代码示例,尤其是 Java 代码示例。
我会接受任何提供 Java 示例并满足完整模式推断和完整非简化并行操作目标的答案。或者,如果这不可能,则采用最接近的解决方法。
我目前使用的是 Spark 2.4.0。
我研究了以下相关问题:
Implicit schema discovery on a JSON-formatted Spark DataFrame column
这个问题与我的类似,不过是针对 Scala 的。没有公认的答案。 OP 在评论中宣布,他们找到了一种“hacky”解决方案来让 from_schema
正常工作。除了“hackiness”之外,该解决方案的问题在于它仅从数据帧的第一行推断模式,因此类型可能受到过于严格的约束:
val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first
编辑:我尝试了指示的解决方案 here正如下面评论中所讨论的。这是实现:
SparkSession spark = SparkSession
.builder()
.appName("example")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().text(conf.getSourcePath());
df.cache();
String schema = df.select(schema_of_json(col("value")))
.as(Encoders.STRING())
.first();
df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
.drop("value")
.write()
.mode("append")
.parquet(conf.getDestinationPath());
从这段代码中我得到了一个错误:
AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text
此错误导致我收到以下 Spark 拉取请求: https://github.com/apache/spark/pull/22775
这似乎表明 schema_of_json
从来都不是为了对整个表进行模式推断而应用于整个表,而是从传递的单个文字 JSON 样本中推断模式直接使用 lit("some json")
。在这种情况下,我不知道 Spark 是否提供任何解决方案来从整个表上的 JSON 进行完整模式推断。除非这里有人可以纠正我对此拉取请求的阅读或提供替代方法?
最佳答案
实际上有一个非常简单的解决方案,使用 DataFrameReader.json(Dataset<String>)
,不知道为什么它没有出现在我的搜索中:
Dataset<String> ds = ...;
spark.read()
.json(ds)
.write()
.mode("append")
.parquet(conf.getDestinationPath());
如果源数据集中有多列,显然您可以只选择要操作的一列。内容类型必须是 String
(例如,不是 Row
)。
关于java - 将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56275889/