scala - 以编程方式提交作业时 Spark EC2 集群上出现 java.io.EOFException

标签 scala amazon-ec2 apache-spark

真的需要你的帮助来理解我做错了什么。

我实验的目的是以编程方式运行 Spark 作业,而不是使用 ./spark-shell 或 ./spark-submit (这些都对我有用)

环境: 我使用 ./spark-ec2 脚本创建了一个包含 1 个主节点和 1 个工作节点的 Spark 集群

但是,当我尝试运行打包在 jar 中的代码时,集群看起来不错:

val logFile = "file:///root/spark/bin/README.md"

val conf = new SparkConf()
conf.setAppName("Simple App")
conf.setJars(List("file:///root/spark/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar"))
conf.setMaster("spark://ec2-54-89-51-36.compute-1.amazonaws.com:7077")

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(_.contains("a")).count()
val numBs = logData.filter(_.contains("b")).count()
println(s"1. Lines with a: $numAs, Lines with b: $numBs")

我遇到异常:

*[info] Running com.paycasso.SimpleApp 
14/09/05 14:50:29 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/09/05 14:50:29 INFO SecurityManager: Changing view acls to: root
14/09/05 14:50:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root)
14/09/05 14:50:30 INFO Slf4jLogger: Slf4jLogger started
14/09/05 14:50:30 INFO Remoting: Starting remoting
14/09/05 14:50:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@ip-10-224-14-90.ec2.internal:54683]
14/09/05 14:50:30 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@ip-10-224-14-90.ec2.internal:54683]
14/09/05 14:50:30 INFO SparkEnv: Registering MapOutputTracker
14/09/05 14:50:30 INFO SparkEnv: Registering BlockManagerMaster
14/09/05 14:50:30 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140905145030-85cb
14/09/05 14:50:30 INFO MemoryStore: MemoryStore started with capacity 589.2 MB.
14/09/05 14:50:30 INFO ConnectionManager: Bound socket to port 47852 with id = ConnectionManagerId(ip-10-224-14-90.ec2.internal,47852)
14/09/05 14:50:30 INFO BlockManagerMaster: Trying to register BlockManager
14/09/05 14:50:30 INFO BlockManagerInfo: Registering block manager ip-10-224-14-90.ec2.internal:47852 with 589.2 MB RAM
14/09/05 14:50:30 INFO BlockManagerMaster: Registered BlockManager
14/09/05 14:50:30 INFO HttpServer: Starting HTTP Server
14/09/05 14:50:30 INFO HttpBroadcast: Broadcast server started at http://**.***.**.**:49211
14/09/05 14:50:30 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e2748605-17ec-4524-983b-97aaf2f94b30
14/09/05 14:50:30 INFO HttpServer: Starting HTTP Server
14/09/05 14:50:31 INFO SparkUI: Started SparkUI at http://ip-10-224-14-90.ec2.internal:4040
14/09/05 14:50:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/05 14:50:32 INFO SparkContext: Added JAR file:///root/spark/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar at http://**.***.**.**:46491/jars/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar with timestamp 1409928632274
14/09/05 14:50:32 INFO AppClient$ClientActor: Connecting to master spark://ec2-54-89-51-36.compute-1.amazonaws.com:7077...
14/09/05 14:50:32 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=617820979
14/09/05 14:50:32 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 589.0 MB)
14/09/05 14:50:32 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140905145032-0005
14/09/05 14:50:32 INFO AppClient$ClientActor: Executor added: app-20140905145032-0005/0 on worker-20140905141732-ip-10-80-90-29.ec2.internal-57457 (ip-10-80-90-29.ec2.internal:57457) with 2 cores
14/09/05 14:50:32 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140905145032-0005/0 on hostPort ip-10-80-90-29.ec2.internal:57457 with 2 cores, 512.0 MB RAM
14/09/05 14:50:32 INFO AppClient$ClientActor: Executor updated: app-20140905145032-0005/0 is now RUNNING
14/09/05 14:50:33 INFO FileInputFormat: Total input paths to process : 1
14/09/05 14:50:33 INFO SparkContext: Starting job: count at SimpleApp.scala:26
14/09/05 14:50:33 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:26) with 1 output partitions (allowLocal=false)
14/09/05 14:50:33 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:26)
14/09/05 14:50:33 INFO DAGScheduler: Parents of final stage: List()
14/09/05 14:50:33 INFO DAGScheduler: Missing parents: List()
14/09/05 14:50:33 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:26), which has no missing parents
14/09/05 14:50:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:26)
14/09/05 14:50:33 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/09/05 14:50:36 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-80-90-29.ec2.internal:36966/user/Executor#2034537974] with ID 0
14/09/05 14:50:36 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: ip-10-80-90-29.ec2.internal (PROCESS_LOCAL)
14/09/05 14:50:36 INFO TaskSetManager: Serialized task 0.0:0 as 1880 bytes in 8 ms
14/09/05 14:50:37 INFO BlockManagerInfo: Registering block manager ip-10-80-90-29.ec2.internal:59950 with 294.9 MB RAM
14/09/05 14:50:38 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/05 14:50:38 WARN TaskSetManager: Loss was due to java.io.EOFException
java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
    at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
    at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
    at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)*

我实际上正在做的是调用“sbt run”。所以我组装了 scala 项目并运行它。 顺便说一句,我在主主机上运行该项目,因此驱动程序对于工作主机来说肯定是可见的。 任何帮助表示赞赏。这很奇怪,这样一个简单的例子在集群中不起作用。我相信使用 ./spark-submit 并不方便。 提前致谢。

最佳答案

浪费了很多时间,我发现了问题所在。尽管我没有在我的应用程序中使用 hadoop/hdfs,但 hadoop 客户端很重要。问题出在 hadoop-client 版本中,它与构建 Spark 的 hadoop 版本不同。 Spark 的 hadoop 版本是 1.2.1,但在我的应用程序中是 2.4。

当我在应用程序中将 hadoop 客户端版本更改为 1.2.1 时,我可以在集群上执行 Spark 代码。

关于scala - 以编程方式提交作业时 Spark EC2 集群上出现 java.io.EOFException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25688940/

相关文章:

apache-spark - pyspark.sql.utils.AnalysisException : Column ambiguous but no duplicate column names

scala - Spark 应用程序如何开始使用 sbt run。

scala - Scala中两个数据框的架构比较

python - 如何在 aws ec2 实例上安装 python3.6

amazon-web-services - AWS-Auto Scaling 上的 AWS cloudwatch 自定义指标

python - 将 Spark DataFrame 从 Python 迁移到 Scala whithn Zeppelin

scala - 使用 spark 将 null 设置为 Hive 表中数字数据类型的值

scala - -Xlint :unsound-match flag do in Scala? 是什么

amazon-web-services - .ebextensions 删除 preinit 脚本

python - 在pyspark SQL DataFrame中乘以稀疏向量行