java - Spark : Ignoring or handling DataSet select errors

标签 java apache-spark apache-spark-sql

我正在测试一些原型(prototype)应用程序。我们有带有嵌套字段的 json 数据。我正在尝试使用以下 json 和代码提取一些字段:

Feed: {name: "test",[Record: {id: 1 AllColumns: {ColA: "1",ColB: "2"}}...]}

Dataset<Row> completeRecord = sparkSession.read().json(inputPath);
final Dataset<Row> feed = completeRecord.select(completeRecord.col("Feed.Record.AllColumns"));

我有大约 2000 个包含此类记录的文件。我已经单独测试了一些文件,它们工作正常。但对于某些文件,我在第二行遇到以下错误:

org.apache.spark.sql.AnalysisException: Can't extract value from Feed#8.Record: need struct type but got string;

我不确定这里发生了什么。但我想要么优雅地处理这个错误,要么记录哪个文件有该记录。另外,有什么办法可以忽略这一点并继续处理其余文件吗?

最佳答案

根据我所学到的知识回答我自己的问题。有几种方法可以解决它。 Spark 提供了忽略损坏文件和损坏记录的选项。

要忽略损坏的文件,可以将以下标志设置为 true:

spark.sql.files.ignoreCorruptFiles=true

为了更细粒度的控制并忽略坏记录而不是忽略整个文件。您可以使用 Spark api 提供的三种模式之一。

According to DataFrameReader api

mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a new field configured by columnNameOfCorruptRecord. When a schema is set by user, it sets null for extra fields.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.

PERMISSIVE 模式对我来说效果非常好,但是当我提供自己的模式时,Spark 用 null 填充了缺失的属性,而不是将其标记为损坏的记录。

关于java - Spark : Ignoring or handling DataSet select errors,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49290112/

相关文章:

java - Spring context.xml 和 Set.contains()

java - 将列表划分为 n 大小列表的有效方法

apache-spark - 如何在 Spark 中并行化多个数据集?

apache-spark - 它在 spark 中的 py4j gatewayServer 的入口点是什么?

java - 如何使用带有不同lucene分析器的全文索引在neo4j中进行索引和搜索?

java - Keytool 使用错误的密码创建 key

apache-spark - 使用 Airflow dag run 创建 EMR 集群,任务完成后 EMR 将终止

java - 旋转 DataFrame - Spark SQL

apache-spark - 在 spark 中,如何在不重新分配的情况下重命名数据框的列名?

scala - 从 `org.apache.spark.sql.Row` 中提取信息