当以下代码段执行时:
...
stream
.map(_.value())
.flatMap(MyParser.parse(_))
.foreachRDD(rdd => {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val dataFrame = rdd.toDF();
val countsDf = dataFrame.groupBy($"action", window($"time", "1 hour")).count()
val query = countsDf.write.mode("append").jdbc(url, "stats_table", prop)
})
....
发生此错误:java.lang.IllegalArgumentException: Can't get JDBC type for struct<start:timestamp,end:timestamp>
如何保存 org.apache.spark.sql.functions.window()
的输出?对 MySQL 数据库起作用?
最佳答案
我在使用 SPARK SQL 时遇到了同样的问题:
val query3 = dataFrame
.groupBy(org.apache.spark.sql.functions.window($"timeStamp", "10 minutes"), $"data")
.count()
.writeStream
.outputMode(OutputMode.Complete())
.options(prop)
.option("checkpointLocation", "file:///tmp/spark-checkpoint1")
.option("table", "temp")
.format("com.here.olympus.jdbc.sink.OlympusDBSinkProvider")
.start
我通过添加一个用户定义函数解决了这个问题
val toString = udf{(window:GenericRowWithSchema) => window.mkString("-")}
对我来说 String 可以,但你可以根据需要更改函数,你甚至可以有两个函数分别返回开始和结束。
我的查询更改为:
val query3 = dataFrame
.groupBy(org.apache.spark.sql.functions.window($"timeStamp", "10 minutes"), $"data")
.count()
.withColumn("window",toString($"window"))
.writeStream
.outputMode(OutputMode.Complete())
.options(prop)
.option("checkpointLocation", "file:///tmp/spark-checkpoint1")
.option("table", "temp")
.format("com.here.olympus.jdbc.sink.OlympusDBSinkProvider")
.start
关于scala - 如何使用 Spark SQL DataFrame 在 JDBC 中保留 window() 函数的输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39192791/