java - 将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>

标签 java apache-spark apache-spark-sql

Spark 非常擅长在初次从磁盘读取数据时将 JSON 解析为嵌套的 StructType ,但是如果我在 中已经有一个包含 JSON 的 String 列怎么办?数据集,我想将其映射到带有 StructType 列的 Dataset 中,并使用考虑整个数据集的模式推断,同时充分利用并行状态并避免减少行动?

我知道函数 schema_of_jsonfrom_json,它们显然是为了一起使用来完成此任务或类似的任务,但我无法找到实际的工作代码示例,尤其是 Java 代码示例。

我会接受任何提供 Java 示例并满足完整模式推断和完整非简化并行操作目标的答案。或者,如果这不可能,则采用最接近的解决方法。

我目前使用的是 Spark 2.4.0。

我研究了以下相关问题:

Implicit schema discovery on a JSON-formatted Spark DataFrame column

这个问题与我的类似,不过是针对 Scala 的。没有公认的答案。 OP 在评论中宣布,他们找到了一种“hacky”解决方案来让 from_schema 正常工作。除了“hackiness”之外,该解决方案的问题在于它仅从数据帧的第一行推断模式,因此类型可能受到过于严格的约束:

val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first

编辑:我尝试了指示的解决方案 here正如下面评论中所讨论的。这是实现:

    SparkSession spark = SparkSession
            .builder()
            .appName("example")
            .master("local[*]")
            .getOrCreate();

    Dataset<Row> df = spark.read().text(conf.getSourcePath());

    df.cache();

    String schema = df.select(schema_of_json(col("value")))
          .as(Encoders.STRING())
          .first();
    df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
            .drop("value")
            .write()
            .mode("append")
            .parquet(conf.getDestinationPath());

从这段代码中我得到了一个错误:

AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text

此错误导致我收到以下 Spark 拉取请求: https://github.com/apache/spark/pull/22775

这似乎表明 schema_of_json 从来都不是为了对整个表进行模式推断而应用于整个表,而是从传递的单个文字 JSON 样本中推断模式直接使用 lit("some json")。在这种情况下,我不知道 Spark 是否提供任何解决方案来从整个表上的 JSON 进行完整模式推断。除非这里有人可以纠正我对此拉取请求的阅读或提供替代方法?

最佳答案

实际上有一个非常简单的解决方案,使用 DataFrameReader.json(Dataset<String>) ,不知道为什么它没有出现在我的搜索中:

    Dataset<String> ds = ...;

    spark.read()
        .json(ds)
        .write()
        .mode("append")
        .parquet(conf.getDestinationPath());

如果源数据集中有多列,显然您可以只选择要操作的一列。内容类型必须是 String (例如,不是 Row)。

关于java - 将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56275889/

相关文章:

java - 在同一容器中定义可重用的 CDI bean

python - 将 UDF 余弦相似度应用于 Pyspark 中的分组 ML 向量时出现问题

azure - 更新 Azure 中 parquet 文件的现有记录

java - 是否有 Java 数学库允许像 numpy 那样进行矢量化和广播?

java - 如何使 JList 在 netbeans Swing 应用程序中随窗口自动拉伸(stretch)?

java - EMR Spark java 应用程序 GC 问题

java - [spark-cassandra-connector]如何在spark 2.3.1中将scala隐式支持的代码转换为java

python-2.7 - pyspark : how to generate time series? 上的 SparkSQL

java - 如何修复类文件是 Java 8 但最大支持是 Java 7

mysql - 时间戳分区键上的 Spark JoinWithCassandraTable STUCK