java - 使用带水印的追加输出模式时的结构化流异常

标签 java apache-spark spark-structured-streaming

尽管我使用的是 withWatermark(),但我在运行 spark 作业时收到以下错误消息:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

根据我在 programming guide 中看到的内容,这完全符合预期用途(和示例代码)。有谁知道可能出了什么问题?

提前致谢!

相关代码(Java 8、Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();

最佳答案

问题出在 parsed.col 中。将其替换为 col 将解决此问题。我建议始终使用 col 函数而不是 Dataset.col

Dataset.col 返回已解析的列,而col 返回未解析的列

parsed.withWatermark("timestamp", "10 minutes") 将使用同名的新列创建一个新数据集。水印信息附加在新数据集中的timestamp列,而不是parsed.col("timestamp"),所以groupBy中的列不'没有水印。

当您使用未解析的列时,Spark 会为您找出正确的列。

关于java - 使用带水印的追加输出模式时的结构化流异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45577958/

相关文章:

java - 如何在调用 JNA Native.loadLibrary 之前检查 DLL/SO 以避免 UnsatisfiedLinkError

apache-spark - 带有 jar 的 Spark 工作流程

scala - scala 中的最小最大标准化

java - 这是 Android 线程的安全使用吗?

java - 我们可以在没有 JRE 的系统中运行 Java 应用程序吗?

java - 以编程方式生成动态 RadioGroup 并对其进行 setOnCheckedChangeListener

apache-spark - Spark RDD : partitioning according to text file format

azure - 具有合并到多个表的自动加载器

java - 结构化流将 JSON 保存到 HDFS

apache-spark - 如何将托管在 HDFS 中的配置文件传递给 Spark 应用程序?