java - Spark 流: class cast exception for SerializedOffset

标签 java scala apache-spark spark-streaming

我正在 Java/Scala 中编写自定义 Spark 结构化流源(使用 v2 接口(interface)和 Spark 2.3.0)。

在测试与 Spark 偏移/检查点的集成时,出现以下错误:

18/06/20 11:58:49 ERROR MicroBatchExecution: Query [id = 58ec2604-3b04-4912-9ba8-c757d930ac05, runId = 5458caee-6ef7-4864-9968-9cb843075458] terminated with error
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

这是我的 Offset 实现(简化版本,我删除了 JSON(反)序列化):

package mypackage

import org.apache.spark.sql.execution.streaming.SerializedOffset
import org.apache.spark.sql.sources.v2.reader.streaming.Offset

case class MyOffset(offset: Long) extends Offset {

  override val json = "{\"offset\":"+offset+"}"  

}

private object MyOffset {

  def apply(offset: SerializedOffset): MyOffset = new MyOffset(0L)

}

您对如何解决这个问题有什么建议吗?

最佳答案

检查客户端应用程序的 Spark 版本是否与集群的 Spark 版本完全相同。我在 Spark 作业应用程序中使用了 Spark v.2.4.0 依赖项,但集群具有 Spark 引擎 v.2.3.0。当我将依赖项降级到 v.2.3.0 时,错误消失了。

关于java - Spark 流: class cast exception for SerializedOffset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50945795/

相关文章:

scala - Apache POI : Why Is a Row Missing When I use shiftRows?

logging - 如何使用log4j登录到在YARN上运行的Spark应用程序中的本地文件系统?

java - 从Java中的构造函数调用抽象方法可以吗?

java - 如何立即关闭InputStream?

eclipse - 在Scala IDE中打开Scala Play项目时的“object index is not a member of package views.html”

scala - 带有可变参数的Spark UDF

algorithm - 使用 Apache Spark 对时间序列数据进行 K-Means

scala - sbt 包试图下载一个路径不存在的包

java - 我无法使用 Java 从 Telnet 发送超过 1200 字节的请求 {Closed}

java - 如何从应用程序引擎上的 BlobKey 获取 blob 文件名