scala - 从 csv 文件加载约束(amazon deequ)

标签 scala apache-spark amazon-deequ

我正在查看 Deequ,它看起来是一个非常好的图书馆。我想知道是否可以从 HDFS 中的 csv 文件或 orc 表加载约束?

假设我有一张包含这些类型的表格

case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)

我想施加如下约束:

val checks = Check(CheckLevel.Error, "unit testing my data")
                  .isComplete("id") // should never be NULL
                  .isUnique("id") // should not contain duplicates

但我想从 csv 文件加载“.isComplete(“id”)”、“.isUnique(“id”)”,以便业务可以添加约束,我们可以根据他们的输入运行测试


val verificationResult = VerificationSuite()
  .onData(data)
  .addChecks(Seq(checks))
    .run()

我已设法从 suggestResult.constraintSuggestion 获取约束

val allConstraints = suggestionResult.constraintSuggestions
      .flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
      .toSeq

它给出了一个列表,例如:

allConstraints = List(CompletenessConstraint(Completeness(id,None)), ComplianceConstraint(Compliance('id' has no negative values,id >= 0,None))

但它是从 suggestResult.constraintSuggestions 生成的。但我希望能够根据 csv 文件的输入创建一个类似的列表,有人可以帮助我吗?

总结一下: 基本上我只想添加:

val checks = Check(CheckLevel.Error, "unit testing my data")
.isComplete("columnName1")
.isUnique("columnName1")
.isComplete("columnName2")

动态地基于文件,其中文件具有例如:

columnName;isUnique;isComplete (header)
columnName1;true;true
columnName2;false;true

最佳答案

我选择将 CSV 存储在 src/main/resources 中,因为从那里读取它非常容易,并且易于与 QA 代码并行维护。

def readCSV(spark: SparkSession, filename: String): DataFrame = {
  import spark.implicits._

  val inputFileStream = Try {
    this.getClass.getResourceAsStream("/" + filename)
  }
  .getOrElse(
    throw new Exception("Cannot find" + filename + "in src/main/resources")
  )

  val readlines =
    scala.io.Source.fromInputStream(inputFileStream).getLines.toList

  val csvData: Dataset[String] =
    spark.sparkContext.parallelize(readlines).toDS

  spark.read.option("header", true).option("inferSchema", true).csv(csvData)

}

这会将其加载为 DataFrame;这可以轻松地传递到代码,如 GitHub 上的 gavincruick 示例,为了方便起见,将其复制到此处:

//code to build verifier from DF that has a 'Constraint' column
type Verifier = DataFrame => VerificationResult

def generateVerifier(df: DataFrame, columnName: String): Try[Verifier] = {

  val constraintCheckCodes: Seq[String] = df.select(columnName).collect().map(_(0).toString).toSeq

  def checkSrcCode(checkCodeMethod: String, id: Int): String = s"""com.amazon.deequ.checks.Check(com.amazon.deequ.checks.CheckLevel.Error, "$id")$checkCodeMethod"""

  val verifierSrcCode = s"""{
                             |import com.amazon.deequ.constraints.ConstrainableDataTypes
                             |import com.amazon.deequ.{VerificationResult, VerificationSuite}
                             |import org.apache.spark.sql.DataFrame
                             |
                             |val checks = Seq(
                             |  ${constraintCheckCodes.zipWithIndex
                           .map { (checkSrcCode _).tupled }
                           .mkString(",\n  ")}
                             |)
                             |
                             |(data: DataFrame) => VerificationSuite().onData(data).addChecks(checks).run()
                             |}
    """.stripMargin.trim

  println(s"Verification function source code:\n$verifierSrcCode\n")

  compile[Verifier](verifierSrcCode)
}

/** Compiles the scala source code that, when evaluated, produces a value of type T. */
def compile[T](source: String): Try[T] =
  Try {
      val toolbox = currentMirror.mkToolBox()
      val tree = toolbox.parse(source)
      val compiledCode = toolbox.compile(tree)
      compiledCode().asInstanceOf[T]
}

//example usage...

//sample test data
val testDataDF = Seq(
      ("2020-02-12", "England", "E10000034", "Worcestershire", 1),
      ("2020-02-12", "Wales", "W11000024", "Powys", 0),
      ("2020-02-12", "Wales", null, "Unknown", 1),
      ("2020-02-12", "Canada", "MADEUP", "Ontario", 1)
  ).toDF("Date", "Country", "AreaCode", "Area", "TotalCases")

//constraints in a DF
val constraintsDF = Seq(
    (".isComplete(\"Area\")"),
    (".isComplete(\"Country\")"),
    (".isComplete(\"TotalCases\")"),
    (".isComplete(\"Date\")"),
    (".hasCompleteness(\"AreaCode\", _ >= 0.80, Some(\"It should be above 0.80!\"))"),
    (".isContainedIn(\"Country\", Array(\"England\", \"Scotland\", \"Wales\", \"Northern Ireland\"))")
  ).toDF("Constraint")

//Build Verifier from constraints DF
val verifier = generateVerifier(constraintsDF, "Constraint").get

//Run verifier against a sample DF 
val result = verifier(testDataDF)

//display results
VerificationResult.checkResultsAsDataFrame(spark, result).show()

关于scala - 从 csv 文件加载约束(amazon deequ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58453541/

相关文章:

apache-spark - Spark DataFrame saveAsTable with partitionBy 在 HDFS 中不创建 ORC 文件

python - PySpark 逐行函数

scala - Deequ 检查的结果数据帧的列有何含义?

scala - 如何提高使用 Spark-xml 加载大型 XML 文件的并行度?

java - 如何从 java 调用 Amazon Deequ hasDataType

scala - 仅在类而非对象上调用闭包外部的函数时,任务不可序列化 : java. io.NotSerializedException

scala - 类型稳定参数多态性

scala - 分组并在 spark sql 中获取第一个值

scala - Scala 有好的数学/统计库吗?