当我尝试向 google dataproc 集群提交 Spark 流作业时,出现以下异常:
16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
完整输出here 。
这个错误似乎是在spark-env.sh中没有正确定义hadoop配置时发生的 - link1 , link2
是否可以在某处进行配置?有什么解决办法吗?
在本地模式下运行相同的代码工作正常:
sparkConf.setMaster("local[4]")
对于其他上下文:该作业是这样调用的:
gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
这是样板设置代码:
lazy val conf = {
val c = new SparkConf().setAppName(this.getClass.getName)
c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)
if (isLocal) c.setMaster("local[4]")
c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
c.set("spark.streaming.blockInterval", "1s")
}
lazy val ssc = if (checkPointingEnabled) {
StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
} else {
createStreamingContext()
}
private def getCheckPointDirectory: String = {
if (isLocal) localCheckPointPath else checkPointPath
}
private def createStreamingContext(): StreamingContext = {
val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
s.checkpoint(getCheckPointDirectory)
s
}
提前致谢
最佳答案
这是否可能不是您第一次使用给定的检查点目录运行作业,因为检查点目录中已经包含检查点?
发生这种情况是因为检查点硬编码了用于提交 YARN 应用程序的确切 jarfile 参数,并且当使用指向 GCS 的 --jars
标志在 Dataproc 上运行时,这实际上是语法糖Dataproc 会自动将 GCS 中的 jar 文件暂存在本地文件路径 /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall- assembly-0.0.1.jar
中,该路径仅在运行期间临时使用单个作业运行,因为 Spark 无法直接从 GCS 中调用 jarfile,而无需在本地暂存。
但是,在后续作业中,之前的 tmp jarfile 将已被删除,但新作业会尝试引用硬编码到检查点数据中的旧位置。
检查点数据中的硬编码还导致了其他问题;例如,Dataproc 还使用 YARN“标签”来跟踪作业,如果在新的 YARN 应用程序中重复使用旧 Dataproc 作业的“标签”,则会与 YARN 发生冲突。要运行流应用程序,您需要首先清除检查点目录(如果可能)从头开始,然后:
- 在启 Action 业之前,您必须将作业 jarfile 放置在主节点上的某个位置,然后您的“--jar”标志必须指定“file:///path/on/master/node/to/jarfile.jar” .
当您指定“file:///”路径时,dataproc 知道它已经在主节点上,因此它不会重新暂存到/tmp 目录中,因此在这种情况下,检查点指向某个目录是安全的修复了主服务器上的本地目录。
您可以通过 init 操作来完成此操作,也可以提交快速的 Pig 作业(或者只是 ssh 到 master 并下载该 jar 文件):
# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"
# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
- Dataproc 依靠spark.yarn.tags 来跟踪与作业相关的 YARN 应用程序。但是,检查点持有陈旧的 Spark.yarn.tags,这会导致 Dataproc 与似乎与旧作业关联的新应用程序混淆。
目前,只要最近杀死的 jobid 保存在内存中,它只会“清理”可疑的 YARN 应用程序,因此重新启动 dataproc 代理将解决此问题。
# Kill the job through the UI or something before the next step.
# Now use "pig sh" to restart the dataproc agent
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "sh systemctl restart google-dataproc-agent.service"
# Re-run your job without needing to change anything else,
# it'll be fine now if you ever need to resubmit it and it
# needs to recover from the checkpoint again.
请记住,根据检查点的性质,这意味着您将无法更改在后续运行中传递的参数,因为检查点恢复用于破坏您的命令行设置。
关于apache-spark - dataproc 上的 Spark 流抛出 FileNotFoundException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41112171/