apache-spark - 如何在 foreachBatch 中使用临时表?

标签 apache-spark spark-structured-streaming

我们正在构建一个流媒体平台,在该平台上,批处理 SQL 是必不可少的。

val query = streamingDataSet.writeStream.option("checkpointLocation", checkPointLocation).foreachBatch { (df, batchId) => {

      df.createOrReplaceTempView("events")

      val df1 = ExecutionContext.getSparkSession.sql("select * from events")

      df1.limit(5).show()
      // More complex processing on dataframes

    }}.trigger(trigger).outputMode(outputMode).start()

query.awaitTermination()

抛出的错误是:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: events
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'events' not found in database 'default';

流媒体源是带有水印的 Kafka,无需使用 Spark-SQL,我们就能够执行数据帧转换。 Spark 版本是 2.4.0,Scala 是 2.11.7。 Trigger 是 ProcessingTime 每 1 分钟一次,OutputMode 是 Append。

是否有任何其他方法可以促进在 foreachBatch 中使用 spark-sql?它会与 Spark 的升级版本一起使用吗 - 在这种情况下我们升级到版本吗? 请帮忙。谢谢。

最佳答案

tl;drExecutionContext.getSparkSession 替换为 df.sparkSession


StreamingQueryException 的原因是流式查询试图访问 SparkSession 中的 events 临时表,对此一无所知,即 ExecutionContext.getSparkSession

唯一注册了此events 临时表的SparkSession 正是SparkSession df 数据帧被创建内,即 df.sparkSession

关于apache-spark - 如何在 foreachBatch 中使用临时表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58914951/

相关文章:

scala - 如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 中读取它在重启后停止的位置?

apache-spark - org.apache.spark.sql.AnalysisException : 'write' can not be called on streaming Dataset/DataFrame

apache-spark - 如果在提供给 kafka 的数据中遇到意外格式,当您重新启动 spark 作业时会发生什么

sql - 如何将 String 值转换(或强制转换)为 Integer 值?

apache-spark - Spark : How do I write individual row to S3/HDFS as JPG

json - 如何将内存中的 JSON 字符串读入 Spark DataFrame

apache-spark - 使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME

scala - 使用 “When Otherwise”时增长超过64 KB错误

scala - 如何使用spark sc.textFile获取文件名?

apache-spark - Spark 结构化流与 ElasticSearch 集成