java - 使用 Spark 验证 CSV 文件列

标签 java csv apache-spark

我正在尝试在 Spark 中读取 CSV 文件(应该有标题)并将数据加载到现有表中(具有预定义的列和数据类型)。 csv 文件可能非常大,因此如果 csv 中的列标题不是“有效”,我可以避免这样做,那就太好了。

当我当前读取文件时,我指定 StructType 作为架构,但这并不能验证标题是否包含按正确顺序排列的正确列。 这是我到目前为止所拥有的(我正在另一个地方构建“架构”StructType):

sqlContext
  .read()
  .format("csv")
  .schema(schema)
  .load("pathToFile");

如果我添加.option("header", "true)"它将跳过 csv 文件的第一行,并使用我在 StructType 的 add 中传递的名称。方法。 (例如,如果我使用“id”和“name”构建 StructType 并且 csv 中的第一行是“idzzz,name”,则生成的数据帧将包含“id”和“name”列。我希望能够验证csv header 的列名称与我计划加载 csv 的表的列名称相同。

我尝试使用 .head() 读取文件,并对第一行进行一些检查,但这会下载整个文件。

欢迎任何建议。

最佳答案

据我了解,您想要验证您读取的 CSV 的架构。 schema 选项的问题在于,它的目标是告诉 Spark 它是数据的 schema,而不是检查它是否是。

但是,有一个选项可以在读取 CSV 时推断所述架构,这在您的情况下可能非常有用 (inferSchema)。然后,您可以将该架构与您期望的 equals 进行比较,或者执行我将介绍的更宽松的小解决方法。

让我们看看以下文件是如何工作的:

a,b
1,abcd
2,efgh

然后,让我们读取数据。我使用了 scala REPL,但您应该能够非常轻松地将所有内容转换为 Java。

val df = spark.read
    .option("header", true) // reading the header
    .option("inferSchema", true) // infering the sschema
    .csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
                              StructField("b", StringType)))

// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true

走得更远

那是一个完美的世界。事实上,如果您的架构包含例如不可为空的列或其他小的差异,则这种基于对象严格相等的解决方案将不起作用。

val schema2 = StructType(Array(StructField("a", IntegerType, false),
                               StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are  nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false

在这种情况下,您可能需要实现适合您的架构比较方法:

def equalSchemas(s1 : StructType, s2 : StructType) = {
  s1.indices
    .map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
              s1(i).dataType.equals(s2(i).dataType))
    .reduce(_ && _)
}
equalSchemas(schema2, df.schema)
// res23: Boolean = true

我正在检查列的名称和类型是否匹配并且顺序是否相同。您可能需要根据您的需要实现不同的逻辑。

关于java - 使用 Spark 验证 CSV 文件列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58911839/

相关文章:

excel - Excel 工作表中 CSV 数据的最大行数

scala - Spark : SAXParseException while writing to parquet on s3

python - PySpark:when子句中的多个条件

java - 如何以编程方式下载和安装 apk?

java - 如何让 Android 服务保持 phonegap 用户界面的活力

python - 在 CSV 文件 python 中添加带逗号的字符串

python - Apache Spark ALS - 如何执行实时推荐/折叠匿名用户

java - 如何使用 Eclipse OpenJ9 进行堆转储?

java - 如何通过反射获取多重签名调用方方法?

cakephp - 使用数组实现 Cakephp 模型