我正在尝试在 Spark 中读取 CSV 文件(应该有标题)并将数据加载到现有表中(具有预定义的列和数据类型)。 csv 文件可能非常大,因此如果 csv 中的列标题不是“有效”,我可以避免这样做,那就太好了。
当我当前读取文件时,我指定 StructType 作为架构,但这并不能验证标题是否包含按正确顺序排列的正确列。 这是我到目前为止所拥有的(我正在另一个地方构建“架构”StructType):
sqlContext
.read()
.format("csv")
.schema(schema)
.load("pathToFile");
如果我添加.option("header", "true)"
它将跳过 csv 文件的第一行,并使用我在 StructType 的 add
中传递的名称。方法。 (例如,如果我使用“id”和“name”构建 StructType 并且 csv 中的第一行是“idzzz,name”,则生成的数据帧将包含“id”和“name”列。我希望能够验证csv header 的列名称与我计划加载 csv 的表的列名称相同。
我尝试使用 .head()
读取文件,并对第一行进行一些检查,但这会下载整个文件。
欢迎任何建议。
最佳答案
据我了解,您想要验证您读取的 CSV 的架构。 schema 选项的问题在于,它的目标是告诉 Spark 它是数据的 schema,而不是检查它是否是。
但是,有一个选项可以在读取 CSV 时推断所述架构,这在您的情况下可能非常有用 (inferSchema
)。然后,您可以将该架构与您期望的 equals
进行比较,或者执行我将介绍的更宽松的小解决方法。
让我们看看以下文件是如何工作的:
a,b
1,abcd
2,efgh
然后,让我们读取数据。我使用了 scala REPL,但您应该能够非常轻松地将所有内容转换为 Java。
val df = spark.read
.option("header", true) // reading the header
.option("inferSchema", true) // infering the sschema
.csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
StructField("b", StringType)))
// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true
走得更远
那是一个完美的世界。事实上,如果您的架构包含例如不可为空的列或其他小的差异,则这种基于对象严格相等的解决方案将不起作用。
val schema2 = StructType(Array(StructField("a", IntegerType, false),
StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false
在这种情况下,您可能需要实现适合您的架构比较方法:
def equalSchemas(s1 : StructType, s2 : StructType) = {
s1.indices
.map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
s1(i).dataType.equals(s2(i).dataType))
.reduce(_ && _)
}
equalSchemas(schema2, df.schema)
// res23: Boolean = true
我正在检查列的名称和类型是否匹配并且顺序是否相同。您可能需要根据您的需要实现不同的逻辑。
关于java - 使用 Spark 验证 CSV 文件列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58911839/