scala - 单元测试 Spark 数据帧转换链接

标签 scala unit-testing apache-spark apache-spark-sql parquet

我是 scala spark 生态系统的新手,想知道对链式数据帧转换进行单元测试的最佳方法是什么。下面是我想测试的方法的代码示例

def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
    dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
        .write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
        .parquet("some hdfs/s3/url")        
} 

问题是 parquet 是 Unit 返回类型,这使得测试变得困难。 转换本质上是不可变的,这使得模拟和监视有点困难,这一事实进一步放大了这个问题

为了创建数据框,我在 csv 中转储了测试数据集

最佳答案

请为数据框单元测试找到简单的示例。你可以把它分成两部分。第一的。测试转换,你可以做简单的shell脚本来测试写入的文件

import com.holdenkarau.spark.testing._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{FunSuite, Matchers}

class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase    {
 import spark.implicits._

  test("Testing Input customer data date transformation") {


    val inputSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false)
    )
    val expectedSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false),
      StructField("dummyColumn", StringType, false)

    )
    val inputData = Seq(
      Row(8, "bat"),
      Row(64, "mouse"),
      Row(-27, "horse")
    )

    val expectedData = Seq(
      Row (8, "bat","test"),
      Row(64, "mouse","test"),
      Row(-27, "horse","test")
    )

    val inputDF = spark.createDataFrame(
      spark.sparkContext.parallelize(inputData),
      StructType(inputSchema)
    )

    val expectedDF = spark.createDataFrame(
      spark.sparkContext.parallelize(expectedData),
      StructType(expectedSchema)
    )


    val actual = transformSomeDf(inputDF)

    assertDataFrameEquals(actual, expectedDF) // equal



  }

  def transformSomeDf(df:DataFrame):DataFrame={
    df.withColumn("dummyColumn",lit("test"))
  }
}

Sbt.build 配置

name := "SparkTest"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.3.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test

)

关于scala - 单元测试 Spark 数据帧转换链接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54403226/

相关文章:

Scala Akka Play, future 不会回来

unit-testing - Jest mock /监视 Mongoose 链式(查找、排序、限制、跳过)方法

scala - 使用 Gradle 从单个源项目编译多个 jar

unit-testing - Ember CLI Controller 测试 : Uncaught TypeError: Cannot read property 'transitionToRoute' of null

php - 我如何在 PHPUnit 中断言 InstanceOf()?

scala - 使用hadoop parquet处理大数据到CSV输出

apache-spark - MLlib 的输入格式问题

scala - 编译器似乎忽略类型细化中的类型绑定(bind)

Scala 选项返回类型

database - 在 Scala 中功能性地处理数据库游标