apache-spark - 如何为 Sparks 新结构化流编写集成测试?

标签 apache-spark integration-testing scalatest

尝试测试 Spark Structured Streams ......但失败了......我如何正确测试它们?

我遵循了来自 here 的一般 Spark 测试问题,而我最近的尝试是 [ 1 ] 看起来像:

import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec  
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode

class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {

  describe("Structured Streaming") {

    it("Read file from system") {

      val schema = new StructType()
        .add("station_id", IntegerType)
        .add("name", StringType)
        .add("lat", DoubleType)
        .add("long", DoubleType)
        .add("dockcount", IntegerType)
        .add("landmark", StringType)
        .add("installation", DateType)

      val sourceDF = spark.readStream
        .option("header", "true")
        .schema(schema)
        .csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
        .coalesce(1)

      val countSource = sourceDF.count()

      val query = sourceDF.writeStream
        .format("memory")
        .queryName("Output")
        .outputMode(OutputMode.Append())
        .start()
        .processAllAvailable()

      assert(countSource === 70)
    }

  }

}

可悲的是它总是以 org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start() 失败

我也发现了这个 issue在 spark-testing-base repo 并想知道是否甚至可以测试 Spark Structured Streaming?

我想进行集成测试,甚至可能在上面使用 Kafka 来测试检查点或特定的损坏数据场景。有人可以帮我吗?

最后但并非最不重要的一点是,我认为该版本可能也是一个限制因素 - 由于 Azure HDInsight 部署选项,我目前针对 2.1.0 进行开发。如果这是拖累,自托管是一种选择。

最佳答案

你解决了吗?

在通过调用 start() 开始执行之前,您正在对流式数据帧执行 count()。
如果你想要一个计数,这样做怎么样?

  sourceDF.writeStream
    .format("memory")
    .queryName("Output")
    .outputMode(OutputMode.Append())
    .start()
    .processAllAvailable()

  val results: List[Row] = spark.sql("select * from Output").collectAsList()
  assert(results.size() === 70) 

关于apache-spark - 如何为 Sparks 新结构化流编写集成测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49521319/

相关文章:

testing - 集成测试应该独立吗?

java - ScalaTest - java.lang.IncompatibleClassChangeError : Implementing class

python - Spark VectorAssembler 错误 - PySpark 2.3 - Python

streaming - 如何使 Spark 分区具有粘性,即与节点保持一致?

java - 在嵌入式 Tomcat 7 中部署 WAR

javascript - 如何使用 npm 库提供异步集成测试?

scala - 我们如何对 Spark RDD 中的数据进行排序和分组?

python - Pyspark - 2 个数据帧之间的区别 - 识别插入、更新和删除

scala - 测试 Right[Seq[MyClass]] 和属性

junit - 如何防止 sbt 运行集成测试?