scala - 如何使用 Spark SQL DataFrame 在 JDBC 中保留 window() 函数的输出?

标签 scala apache-spark apache-spark-sql

当以下代码段执行时:

...
stream
      .map(_.value())
      .flatMap(MyParser.parse(_))
      .foreachRDD(rdd => {
        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
        import spark.implicits._

        val dataFrame = rdd.toDF();
        val countsDf = dataFrame.groupBy($"action", window($"time", "1 hour")).count()
        val query = countsDf.write.mode("append").jdbc(url, "stats_table", prop)
      })
....

发生此错误:java.lang.IllegalArgumentException: Can't get JDBC type for struct<start:timestamp,end:timestamp>

如何保存 org.apache.spark.sql.functions.window() 的输出?对 MySQL 数据库起作用?

最佳答案

我在使用 SPARK SQL 时遇到了同样的问题:

val query3 = dataFrame
  .groupBy(org.apache.spark.sql.functions.window($"timeStamp", "10 minutes"), $"data")
  .count()
  .writeStream
  .outputMode(OutputMode.Complete())
  .options(prop)
  .option("checkpointLocation", "file:///tmp/spark-checkpoint1")
  .option("table", "temp")
  .format("com.here.olympus.jdbc.sink.OlympusDBSinkProvider")
  .start

我通过添加一个用户定义函数解决了这个问题

val toString = udf{(window:GenericRowWithSchema) => window.mkString("-")}

对我来说 String 可以,但你可以根据需要更改函数,你甚至可以有两个函数分别返回开始和结束。

我的查询更改为:

val query3 = dataFrame
  .groupBy(org.apache.spark.sql.functions.window($"timeStamp", "10 minutes"), $"data")
  .count()
  .withColumn("window",toString($"window"))
  .writeStream
  .outputMode(OutputMode.Complete())
  .options(prop)
  .option("checkpointLocation", "file:///tmp/spark-checkpoint1")
  .option("table", "temp")
  .format("com.here.olympus.jdbc.sink.OlympusDBSinkProvider")
  .start

关于scala - 如何使用 Spark SQL DataFrame 在 JDBC 中保留 window() 函数的输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39192791/

相关文章:

arrays - 在数组或Scala Spark中的其他任何集合中迭代RDD和存储的值

sql - Spark scala : SELECT in a foreach loop returns java. lang.NullPointerException

hadoop - UNION parent rdd and child rdd before action 时会发生什么?

java - Spark DirectStream 问题

apache-spark-sql - 在 Zeppelin 中使用 %pyspark 解释器注册表时,我无法访问 %sql 中的表

scala - 如何将 Array[Row] 转换为 RDD[Row]

mysql 与 scala 和 sbt 的连接

scala - 在 scala Spark 中将训练和测试中的数据集拆分为一行

scala - Spark DataFrame 计算列

apache-spark - 如何在 SparkSQL 中使用 Dataframe 获取行迭代器