scala - Spark 集群无法从远程 Scala 应用程序分配资源

标签 scala hadoop apache-spark

因此,我一直在尝试着手运行 Spark-scala。我写了一个简单的测试程序,它只是稍微扩展了 SparkPi 示例:

def main(args: Array[String]): Unit = {
test()
}

 def calcPi(spark: SparkContext, args: Array[String], numSlices: Long): Array[Double] = {
val start = System.nanoTime()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(numSlices * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
    val x = random * 2 - 1
    val y = random * 2 - 1
    if (x*x + y*y < 1) 1 else 0
  }.reduce(_ + _)
val piVal = 4.0 * count / n
println("Pi is roughly " + piVal)
spark.stop()
val end = System.nanoTime()
return Array(piVal, end - start, (piVal - Math.PI)/Math.PI)
}


def test(): Unit ={
val conf = new SparkConf().setAppName("Pi Test")
conf.setSparkHome("/usr/local/spark")
conf.setMaster("spark://<URL_OF_SPARK_CLUSTER>:7077")
conf.set("spark.executor.memory", "512m")
conf.set("spark.cores.max", "1")
conf.set("spark.blockManager.port", "33291")
conf.set("spark.executor.port", "33292")
conf.set("spark.broadcast.port", "33293")
conf.set("spark.fileserver.port", "33294")
conf.set("spark.driver.port", "33296")
conf.set("spark.replClassServer.port", "33297")
val sc = new SparkContext(conf)

val pi = calcPi(sc, Array(), 1000)
for(item <- pi) {
  println(item)
}
}

然后我确保端口 33291-33300 在我的机器上打开。

当我运行程序时,它成功地命中了 spark 集群,并且似乎分配了核心:

enter image description here

但是当程序到达实际运行 hadoop 作业的位置时,应用程序日志显示:

15/12/07 11:50:21 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at BotDetector.scala:49), which has no missing parents
15/12/07 11:50:21 INFO MemoryStore: ensureFreeSpace(1840) called with curMem=0, maxMem=2061647216
15/12/07 11:50:21 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1840.0 B, free 1966.1 MB)
15/12/07 11:50:21 INFO MemoryStore: ensureFreeSpace(1194) called with curMem=1840, maxMem=2061647216
15/12/07 11:50:21 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1194.0 B, free 1966.1 MB)
15/12/07 11:50:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.5.106:33291 (size: 1194.0 B, free: 1966.1 MB)
15/12/07 11:50:21 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:874
15/12/07 11:50:21 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at BotDetector.scala:49)
15/12/07 11:50:21 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/12/07 11:50:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:50:51 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:51:06 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:51:21 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:51:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:51:51 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:52:06 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:52:21 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/12/07 11:52:22 INFO AppClient$ClientActor: Executor updated: app-20151207175020-0003/0 is now EXITED (Command exited with code 1)
15/12/07 11:52:22 INFO SparkDeploySchedulerBackend: Executor app-20151207175020-0003/0 removed: Command exited with code 1
15/12/07 11:52:22 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/12/07 11:52:22 INFO AppClient$ClientActor: Executor added: app-20151207175020-0003/1 on worker-20151207173821-10.240.0.7-33295 (10.240.0.7:33295) with 5 cores
15/12/07 11:52:22 INFO SparkDeploySchedulerBackend: Granted executor ID app-20151207175020-0003/1 on hostPort 10.240.0.7:33295 with 5 cores, 512.0 MB RAM
15/12/07 11:52:22 INFO AppClient$ClientActor: Executor updated: app-20151207175020-0003/1 is now LOADING
15/12/07 11:52:23 INFO AppClient$ClientActor: Executor updated: app-20151207175020-0003/1 is now RUNNING
15/12/07 11:52:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

当我进入远程服务器并查看工作日志时,他们说:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hduser/apache-tez-0.7.0-src/tez-dist/target/tez-0.7.0/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/12/07 17:50:21 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
15/12/07 17:50:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/12/07 17:50:21 INFO spark.SecurityManager: Changing view acls to: hduser,jschirmer
15/12/07 17:50:21 INFO spark.SecurityManager: Changing modify acls to: hduser,jschirmer
15/12/07 17:50:21 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, jschirmer); users with modify permissions: Set(hduser, jschirmer)
15/12/07 17:50:22 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/12/07 17:50:22 INFO Remoting: Starting remoting
15/12/07 17:50:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@10.240.0.7:33292]
15/12/07 17:50:22 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 33292.
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:65)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:146)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:245)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:97)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:159)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:65)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        ... 4 more
15/12/07 17:52:22 INFO util.Utils: Shutdown hook called

我已经尝试将驱动程序和执行程序端口设置为显式打开端口,结果相同。目前还不清楚是什么问题。有人有什么建议吗?

另外,请注意,如果我将这段完全相同的代码编译成一个 fat jar,并将其复制到远程服务器,并通过 spark-submit 运行它,那么它会成功运行。我的服务器上确实有一个 yarn 配置,我愿意运行 spark-yarn,但我的理解是这不能从远程服务器完成,因为你将 master 指定为 yarn-cluster,而且没有地方可以将主机放入配置中。

最佳答案

看来你的防火墙有问题。首先检查您是否在集群中启用了所有必需的端口然后在有一些 random ports 之后在 spark 中,因此您需要为集群修复这些端口,然后才能远程使用 spark。

关于scala - Spark 集群无法从远程 Scala 应用程序分配资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34095200/

相关文章:

hadoop - Hive - 每小时窗口平均值

hadoop - 增加hdfs java堆内存的正确方法

hadoop - 将值附加到 PIG 变量

postgresql - Apache Spark : JDBC connection not working

scala - 如何访问定义位置之外的对象中的累加器?

java - Scala 并行集合在某些方面是否比 Java 中已有的并行集合更好?

scala - SBT:如何从程序集中排除源文件和文档?

scala - Spark mapWithState 将所有数据混洗到一个节点

java - 方法引用类似于 Scala 中的 Java 8

SQL 是从右到左计算还是从左到右计算?