apache-spark - 为什么Complete输出模式需要聚合?

标签 apache-spark spark-structured-streaming

我在 Apache Spark 2.2 中使用最新的结构化流,但出现以下异常:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;



为什么 Complete 输出模式需要流式聚合?如果 Spark 允许在流式查询中没有聚合的完整输出模式会发生什么?

scala> spark.version
res0: String = 2.2.0

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SQLContext
implicit val sqlContext: SQLContext = spark.sqlContext
val source = MemoryStream[(Int, Int)]
val ids = source.toDS.toDF("time", "id").
  withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp
  dropDuplicates("id").
  withColumn("time", $"time" cast "long")  // <-- convert time column back from Timestamp to Int

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
scala> val q = ids.
     |   writeStream.
     |   format("memory").
     |   queryName("dups").
     |   outputMode(OutputMode.Complete).  // <-- memory sink supports checkpointing for Complete output mode only
     |   trigger(Trigger.ProcessingTime(30.seconds)).
     |   option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts
     |   start
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
Project [cast(time#10 as bigint) AS time#15L, id#6]
+- Deduplicate [id#6], true
   +- Project [cast(time#5 as timestamp) AS time#10, id#6]
      +- Project [_1#2 AS time#5, _2#3 AS id#6]
         +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:247)
  ... 57 elided

最佳答案

来自 Structured Streaming Programming Guide - 其他查询(不包括聚合, mapGroupsWithStateflatMapGroupsWithState ):

Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.



要回答这个问题:

What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?



可能是OOM。

令人费解的部分是为什么dropDuplicates("id")未标记为聚合。

关于apache-spark - 为什么Complete输出模式需要聚合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45756997/

相关文章:

apache-spark - 如何知道 PySpark 应用程序的部署模式?

scala - Spark 流 |将不同的数据帧写入Synapse DW中的多个表

scala - "row +: savedState.toSeq"在StateStoreRestoreExec.doExecute中做了什么?

java - 如何将 DataSet<Row> 转换为 JSON 消息的 DataSet 以写入 Kafka?

mongodb - 如何在结构化流中处理 JSON 文档(来自 MongoDB)并写入 HBase?

apache-spark - 应用程序的 Apache Spark 自定义 log4j 配置

scala - Spark : How can DataFrame be Dataset[Row] if DataFrame's have a schema

apache-spark - 什么是 Spark RDD 图、血统图、Spark 任务的 DAG?他们是什么关系

scala - 如何使用 sbt-assemble 制作多项目 fat jar

scala - 如何将 Spark Structured Streaming 与 Kafka Direct Stream 一起使用?