我已经按年执行了一个简单的分组操作,并进行了一些聚合,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误消息,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
下面是我的代码。有人可以帮忙吗
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))
df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
我的输入是 csv 格式
id,name,year,rating,duration
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
我的预期输出应该在 hdfs 中,并在一年中进行分区
year,rating,duration
1993,7.4,8956
1921,6.0,15212
1963,10.7,17389
我真的不确定我的方法有什么问题。请帮忙
最佳答案
这是一个多方面的问题:
聚合中使用的列。
For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark. I think you have an issue here.
.enableHiveSupport().config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict")
所以,一般来说:
然后:
这是一个使用套接字输入和 Spark Shell 的示例 - 您可以将其推断为您自己的数据和微批处理的输出(请注意,查看数据时存在延迟):
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
//create stream from socket
import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS()
val windowedCount = stockDs
.withWatermark("time", "20000 milliseconds")
.groupBy(
window($"time", "10 seconds"),
$"symbol"
)
.agg(sum("value"), count($"symbol"))
val query =
windowedCount.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
query.start().awaitTermination()
结果是:
Batch: 14
----------------------------------------------+------+----------+-------------+
|window |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0 |6 |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0 |2 |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0 |1 |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0 |4 |
+---------------------------------------------+------+----------+-------------+
这是一个相当大的话题,你需要从整体上看待它。
您可以看到输出在某些情况下可能会很方便,尽管 avg 输出可用于计算整体 avg。成功。
关于apache-spark - spark结构化流异常: Append output mode not supported without watermark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54117961/