java - Spark - 流数据帧/数据集不支持非基于时间的窗口;

标签 java apache-spark apache-spark-sql spark-streaming

我需要编写带有内部选择和分区依据的 Spark sql 查询。问题是我有 AnalysisException。 我已经在这上面花了几个小时,但用其他方法我没有成功。

异常(exception):

Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]

我认为这个查询太复杂,无法像这样实现。但我不知道要修复它。

 SparkSession spark = SparkUtils.getSparkSession("RawModel");

 Dataset<RawModel> datasetMap = readFromKafka(spark);

 datasetMap.registerTempTable("test");

 Dataset<Row> res = datasetMap.sqlContext().sql("" +
                " select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
                " from (select test.*,  sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
                "  from test " +
                "  ) test " +
                " group by deviceid, grp ");

如有任何建议,我们将不胜感激。 谢谢。

最佳答案

我认为问题出在窗口化规范中:

over (partition by deviceId order by timestamp) 

分区 需要在基于时间的列上 - 在您的情况下为 timestamp 。以下应该有效:

over (partition by timestamp order by timestamp) 

这当然不会解决您查询的意图。可能会尝试以下操作:但不清楚 spark 是否支持它:

over (partition by timestamp, deviceId order by timestamp) 

即使 spark 确实支持它,它仍然会改变您查询的语义。

更新

这是一个权威的来源:来自 Tathagata Das,他是 spark streaming 的关键/核心提交者:http://apache-spark-user-list.1001560.n3.nabble.com/Does-partition-by-and-order-by-works-only-in-stateful-case-td31816.html

enter image description here

关于java - Spark - 流数据帧/数据集不支持非基于时间的窗口;,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53294809/

相关文章:

python - 如何动态地将列/值添加到 pyspark 数据框中的映射类型

java - Spark read() 有效,但 sql() 抛出数据库未找到

java - JFormattedTextField 删除用户输入

java - JFrame 中的 JPanel 中的 JScrollPane 中的 JTextPane

java - 从 Application 类重写 getSingletons 方法时,Ejb 查找失败

java - Apache Spark 使用 Java 从 CSV 读取数组 float

java - 使用 gradle 任务在 jar 中设置变量值

Java 读取 Parquet 文件到 JSON 输出

apache-spark - 使用 spark sql 创建 hive 表

scala - 如何将 Spark RDD 保存到本地文件系统