scala - 通过在 apache spark scala 中编写单元测试来测试实用程序函数

标签 scala unit-testing apache-spark

我有一个用 Scala 编写的实用程序函数,用于从 s3 存储桶读取 Parquet 文件。有人可以帮我为此编写单元测试用例吗

下面是需要测试的功能。

  def readParquetFile(spark: SparkSession,
                      locationPath: String): DataFrame = {
    spark.read
      .parquet(locationPath)
  }

到目前为止,我已经创建了一个主节点是本地的 SparkSession
import org.apache.spark.sql.SparkSession


trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("Test App").getOrCreate()
  }

}

我坚持测试该功能。这是我被卡住的代码。问题是我应该创建一个真正的 Parquet 文件并加载以查看数据框是否正在创建或者是否有模拟框架来测试它。
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.scalatest.FunSpec

class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

  import spark.implicits._

  it("reads a parquet file and creates a dataframe") {

  }

}

编辑:

根据评论中的输入,我想出了以下内容,但我仍然无法理解如何利用它。

我正在使用 https://github.com/findify/s3mock
class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

  import spark.implicits._

  it("reads a parquet file and creates a dataframe") {

    val api = S3Mock(port = 8001, dir = "/tmp/s3")
    api.start

    val endpoint = new EndpointConfiguration("http://localhost:8001", "us-west-2")
    val client = AmazonS3ClientBuilder
      .standard
      .withPathStyleAccessEnabled(true)
      .withEndpointConfiguration(endpoint)
      .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
      .build

    /** Use it as usual. */
    client.createBucket("foo")
    client.putObject("foo", "bar", "baz")
    val url = client.getUrl("foo","bar")

    println(url.getFile())

    val df = ReadAndWrite.readParquetFile(spark,url.getPath())
    df.printSchema()

  }

}

最佳答案

我想通了并保持简单。我可以完成一些基本的测试用例。

这是我的解决方案。我希望这会帮助某人。

import org.apache.spark.sql
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import loaders.ReadAndWrite

class ReadAndWriteTestSpec extends FunSuite with BeforeAndAfterEach{

  private val master = "local"

  private val appName = "ReadAndWrite-Test"

  var spark : SparkSession = _

  override def beforeEach(): Unit = {
    spark = new sql.SparkSession.Builder().appName(appName).master(master).getOrCreate()
  }

  test("creating data frame from parquet file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = spark.read.json("src/test/resources/people.json")
    peopleDF.write.mode(SaveMode.Overwrite).parquet("src/test/resources/people.parquet")

    val df = ReadAndWrite.readParquetFile(sparkSession,"src/test/resources/people.parquet")
    df.printSchema()

  }


  test("creating data frame from text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()
  }

  test("counts should match with number of records in a text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()

    assert(peopleDF.count() == 3)
  }

  test("data should match with sample records in a text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()

    assert(peopleDF.take(1)(0)(0).equals("Michael"))
  }

  test("Write a data frame as csv file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()

    //header argument should be boolean to the user to avoid confusions
    ReadAndWrite.writeDataframeAsCSV(peopleDF,"src/test/resources/out.csv",java.time.Instant.now().toString,",","true")
  }

  override def afterEach(): Unit = {
    spark.stop()
  }

}

case class Person(name: String, age: Int)

关于scala - 通过在 apache spark scala 中编写单元测试来测试实用程序函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55831503/

相关文章:

scala - 路径依赖类型是子类型吗?

scala - 如何将 scalastyle-result.xml 转换为可读的报告

c# - Visual Studio 2008 单元测试入门

java - scala 是否比 java 快,因为两者都在 JVM 上运行?

Scala风格: constant map vs pattern matching

java - Spring :为什么在单元测试中加载应用程序上下文时需要添加javax.el-api依赖项?

unit-testing - 单元测试没有 ChangeDetectorRef 的提供者

scala - 如何将Avro的GenericData.Record的RDD转换为DataFrame?

docker - 无法在 Airflow UI 中编辑 Spark_default

apache-spark - 保存到 Parquet 子分区