apache-spark - 触发流无法写入hdfs路径

标签 apache-spark hadoop apache-spark-sql hdfs spark-structured-streaming

我正在使用Java 1.8使用带有kafka 0.10.x的spark-sql-2.4.1v。

Dataset<Row> dataSet= sparkSession
                      .readStream()
                      .format("kafka")
                      .option("subscribe", INFO_TOPIC)
                      .option("startingOffsets", "latest")
                      .option("enable.auto.commit", false)
                      .option("maxOffsetsPerTrigger", 1000)
                      .option("auto.offset.reset", "latest")
                      .option("failOnDataLoss", false)
                      .load();



StreamingQuery query = dataSet.writeStream()
        .format(PARQUET_FORMAT)
        .option("path", parqetFileName)
        .option("checkpointLocation", checkPtLocation)
        .trigger(Trigger.ProcessingTime("15 seconds"))
        .start();



query.awaitTermination();

将数据写入我的hdfs路径(即parqetFileName)后,它失败并显示以下错误。
[DataStreamer for file /user/parquet/raw/part-00001-7cba7fa3-a98f-442d-9584-b71085b7cd82-c000.snappy.parquet] WARN  org.apache.hadoop.hdfs.DataStreamer - Caught exception
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1249)
        at java.lang.Thread.join(Thread.java:1323)
        at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980)
        at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807)

这里有什么问题以及如何解决?

最佳答案

您的代码中必须包含streamContext.awaitTermination(),否则应用程序将在启动流后立即退出。

关于apache-spark - 触发流无法写入hdfs路径,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58992303/

相关文章:

python - 舍入 double 值并转换为整数

Scala 是 Spark 的必备工具吗?

shell - 使用 shell 拆分数据

java - 如何在Hadoop上模拟运行一些服务器?

json - Postgresql JSONB 数据上的 Spark SQL

scala - 创建 Spark Dataframe 的摘要

java - 较新依赖项中的 Maven 不同包名

python - 使用Python的spark-on-k8s资源登台服务器

hadoop - 丢失的 block 和丢失的 block (复制因子为1)有什么区别?

java-8 - 如何在没有java堆内存错误的情况下将csv读入pyspark