apache-spark - 如何为 Sparks 新结构化流编写集成测试?

标签 apache-spark integration-testing scalatest

尝试测试 Spark Structured Streams ......但失败了......我如何正确测试它们?

我遵循了来自 here 的一般 Spark 测试问题,而我最近的尝试是 [ 1 ] 看起来像:

import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec  
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode

class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {

  describe("Structured Streaming") {

    it("Read file from system") {

      val schema = new StructType()
        .add("station_id", IntegerType)
        .add("name", StringType)
        .add("lat", DoubleType)
        .add("long", DoubleType)
        .add("dockcount", IntegerType)
        .add("landmark", StringType)
        .add("installation", DateType)

      val sourceDF = spark.readStream
        .option("header", "true")

      val countSource = sourceDF.count()

      val query = sourceDF.writeStream

      assert(countSource === 70)



可悲的是它总是以 org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start() 失败

我也发现了这个 issue在 spark-testing-base repo 并想知道是否甚至可以测试 Spark Structured Streaming?

我想进行集成测试,甚至可能在上面使用 Kafka 来测试检查点或特定的损坏数据场景。有人可以帮我吗?

最后但并非最不重要的一点是,我认为该版本可能也是一个限制因素 - 由于 Azure HDInsight 部署选项,我目前针对 2.1.0 进行开发。如果这是拖累,自托管是一种选择。



在通过调用 start() 开始执行之前,您正在对流式数据帧执行 count()。


  val results: List[Row] = spark.sql("select * from Output").collectAsList()
  assert(results.size() === 70) 

