scala - 如何使用spark将来自kafka主题的流数据写入hdfs?

标签 scala apache-spark hadoop apache-kafka hdfs

我一直在尝试使此代码工作数小时:

val spark = SparkSession.builder() 
.appName("Consumer") 
.getOrCreate() 

spark.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", url) 
.option("subscribe", topic) 
.load() 
.select("value") 
.writeStream 
.format(fileFormat) 
.option("path", filePath) 
.option("checkpointLocation", "/tmp/checkpoint") 
.start() 
.awaitTermination() 

它给出了以下异常:
Logical Plan: 
Project [value#8] 
+- StreamingExecutionRelation KafkaV2[Subscribe[MyTopic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] 

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) 
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) 
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) 
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) 
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) 
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) 
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) 
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) 
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)

我不知道发生了什么,我只是想使用 Spark 流将卡夫卡主题中的数据写入HDFS。为什么这么难?我该怎么办呢?

我得到的批处理版本可以正常工作:
spark.read 
.format("kafka") 
.option("kafka.bootstrap.servers", url) 
.option("subscribe", topic) 
.load() 
.selectExpr("CAST(value AS String)") 
.write 
.format(fileFormat) 
.save(filePath)

最佳答案

@happy您在结构化流https://issues.apache.org/jira/browse/SPARK-25257中遇到一个已知的错误

这是因为从磁盘开始的偏移量不会反序列化,并且此修复程序将在以后的发行版中合并

关于scala - 如何使用spark将来自kafka主题的流数据写入hdfs?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53400975/

相关文章:

python - 将 UDF 余弦相似度应用于 Pyspark 中的分组 ML 向量时出现问题

apache-spark - 如何重新启动停止的 Spark 上下文?

java - 为什么工作节点看不到另一个工作节点上累加器的更新?

hadoop - 如何查找我的 hadoop 集群的集群 ID?

mysql - Hive - 成功导入 Sqoop 后没有表列出

hadoop - 将三元组放入HDFS时,Storm Trident拓扑缺少元组

scala - 使用 Scala 在 Play 框架中组合 `Future` 结果

image - 使用ImageIO.write的质量下降

scala - 使用 sbt gen-idea 时缺少 Play 源

scala - 为什么 Scala 的 Option[T] 不直接转换为字节码中的 T?