尝试测试 Spark Structured Streams ......但失败了......我如何正确测试它们?
我遵循了来自 here 的一般 Spark 测试问题,而我最近的尝试是 [ 1 ] 看起来像:
import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode
class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {
describe("Structured Streaming") {
it("Read file from system") {
val schema = new StructType()
.add("station_id", IntegerType)
.add("name", StringType)
.add("lat", DoubleType)
.add("long", DoubleType)
.add("dockcount", IntegerType)
.add("landmark", StringType)
.add("installation", DateType)
val sourceDF = spark.readStream
.option("header", "true")
.schema(schema)
.csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
.coalesce(1)
val countSource = sourceDF.count()
val query = sourceDF.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable()
assert(countSource === 70)
}
}
}
可悲的是它总是以
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
失败我也发现了这个 issue在 spark-testing-base repo 并想知道是否甚至可以测试 Spark Structured Streaming?
我想进行集成测试,甚至可能在上面使用 Kafka 来测试检查点或特定的损坏数据场景。有人可以帮我吗?
最后但并非最不重要的一点是,我认为该版本可能也是一个限制因素 - 由于 Azure HDInsight 部署选项,我目前针对 2.1.0 进行开发。如果这是拖累,自托管是一种选择。
最佳答案
你解决了吗?
在通过调用 start() 开始执行之前,您正在对流式数据帧执行 count()。
如果你想要一个计数,这样做怎么样?
sourceDF.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable()
val results: List[Row] = spark.sql("select * from Output").collectAsList()
assert(results.size() === 70)
关于apache-spark - 如何为 Sparks 新结构化流编写集成测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49521319/