apache-spark - dataproc 上的 Spark 流抛出 FileNotFoundException

标签 apache-spark google-cloud-dataproc

当我尝试向 google dataproc 集群提交 Spark 流作业时,出现以下异常:

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
        at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

完整输出here

这个错误似乎是在spark-env.sh中没有正确定义hadoop配置时发生的 - link1 , link2

是否可以在某处进行配置?有什么解决办法吗?

在本地模式下运行相同的代码工作正常:

sparkConf.setMaster("local[4]")

对于其他上下文:该作业是这样调用的:

gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false

这是样板设置代码:

  lazy val conf = {
    val c = new SparkConf().setAppName(this.getClass.getName)
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)

    if (isLocal) c.setMaster("local[4]")
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    c.set("spark.streaming.blockInterval", "1s")
  }

  lazy val ssc = if (checkPointingEnabled) {
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
  } else {
    createStreamingContext()
  }

  private def getCheckPointDirectory: String = {
    if (isLocal) localCheckPointPath else checkPointPath
  }

  private def createStreamingContext(): StreamingContext = {
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
    s.checkpoint(getCheckPointDirectory)
    s
  }

提前致谢

最佳答案

这是否可能不是您第一次使用给定的检查点目录运行作业,因为检查点目录中已经包含检查点?

发生这种情况是因为检查点硬编码了用于提交 YARN 应用程序的确切 jarfile 参数,并且当使用指向 GCS 的 --jars 标志在 Dataproc 上运行时,这实际上是语法糖Dataproc 会自动将 GCS 中的 jar 文件暂存在本地文件路径 /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall- assembly-0.0.1.jar 中,该路径仅在运行期间临时使用单个作业运行,因为 Spark 无法直接从 GCS 中调用 jarfile,而无需在本地暂存。

但是,在后续作业中,之前的 tmp jarfile 将已被删除,但新作业会尝试引用硬编码到检查点数据中的旧位置。

检查点数据中的硬编码还导致了其他问题;例如,Dataproc 还使用 YARN“标签”来跟踪作业,如果在新的 YARN 应用程序中重复使用旧 Dataproc 作业的“标签”,则会与 YARN 发生冲突。要运行流应用程序,您需要首先清除检查点目录(如果可能)从头开始,然后:

  1. 在启 Action 业之前,您必须将作业 jarfile 放置在主节点上的某个位置,然后您的“--jar”标志必须指定“file:///path/on/master/node/to/jarfile.jar” .

当您指定“file:///”路径时,dataproc 知道它已经在主节点上,因此它不会重新暂存到/tmp 目录中,因此在这种情况下,检查点指向某个目录是安全的修复了主服务器上的本地目录。

您可以通过 init 操作来完成此操作,也可以提交快速的 Pig 作业(或者只是 ssh 到 master 并下载该 jar 文件):

# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
    --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"

# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
    --class com.company.skyfall.Skyfall \
    --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
    --properties spark.ui.showConsoleProgress=false
  • Dataproc 依靠spark.yarn.tags 来跟踪与作业相关的 YARN 应用程序。但是,检查点持有陈旧的 Spark.yarn.tags,这会导致 Dataproc 与似乎与旧作业关联的新应用程序混淆。
  • 目前,只要最近杀死的 jobid 保存在内存中,它只会“清理”可疑的 YARN 应用程序,因此重新启动 dataproc 代理将解决此问题。

    # Kill the job through the UI or something before the next step.
    # Now use "pig sh" to restart the dataproc agent
    gcloud dataproc jobs submit pig --cluster my-test-cluster \
        --execute "sh systemctl restart google-dataproc-agent.service"
    
    # Re-run your job without needing to change anything else,
    # it'll be fine now if you ever need to resubmit it and it
    # needs to recover from the checkpoint again.
    

    请记住,根据检查点的性质,这意味着您将无法更改在后续运行中传递的参数,因为检查点恢复用于破坏您的命令行设置。

    关于apache-spark - dataproc 上的 Spark 流抛出 FileNotFoundException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41112171/

    相关文章:

    apache-spark - PySpark 将 ArrayType(ArrayType(NoneType)) 转换为 ArrayType(ArrayType(IntegerType))

    apache-spark - 如何将 Spark 数据帧写入 Neo4j 数据库

    scala - 如何按多列过滤数据框?

    hadoop - 使用 HCFS 读取 JSON 换行文件

    apache-spark - 如何在pyspark中将列表列表合并为单个列表

    json - 在 Apache Spark 中读取多行 JSON

    google-cloud-platform - Spark-BigTable - HBase 客户端未在 Pyspark 中关闭?

    hadoop - 使用Cloud Shell连接到HDFS的问题

    Dataproc 集群中的 Scala Spark 作业返回 java.util.NoSuchElementException : None.

    apache-spark - 将 GCS 暂存目录用于 Spark 作业(在 Dataproc 上)