java - spark如何向集群发送作业?

标签 java scala apache-spark

我正在使用 YARN 并试图了解 Spark 如何使用 YARN 将作业发送到集群。因此,我深入研究了资源,发现当我们提交作业(例如 foreach)时,以下方法正在 SparkContext::runJob 中执行:

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

然后 JobWaiter 被创建在

DAGScheduler::submitJob 

并且正在发布 JobSubmitted 事件。该事件正在处理中

DAGSchedulerEventProcessLoop::handleJobSubmitted 

将另一个事件 (SparkListenerJobStart) 发布到监听器总线。然后调用

DAGScheduler::submitStage

所以在方法中似乎应该有将阶段提交到集群的逻辑。但我唯一看到的是:

  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

实际上,我希望那里有一些类似 NIO 的代码。

问题:如果我们使用YARN,驱动程序如何与集群通信?这段代码在哪里?有人可以帮助我吗?

最佳答案

如您所知,Spark 可以在多个集群管理器上运行。 Spark 通过使用名为 SchedulerBackend 的抽象来实现这一点。

对于 YARN,有两种实现方式:

  1. YarnClientSchedulerBackend(用于客户端部署模式)
  2. YarnSchedulerBackend(用于集群部署模式)

这是源代码:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

关于java - spark如何向集群发送作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46131277/

相关文章:

java - 如何用数据库实现一个完美的队列

scala - 为什么用提取器替换我的 Scala 案例类会破坏我的高阶函数?

apache-spark - PySpark - 时间戳行为

apache-spark - 根据 Spark 中的条件获取行索引

java - 类型注释位置删除

java - Action 图像捕捉导致观看时图像模糊

java - akka.remote.RemoteActorRefProvider 上的 ClassNotFoundException

scala - 使用 JDBC 创建 PostgreSQL 触发器

scala - Spark 不同,然后加入给出 IndexOutOfBoundsException

java - 在代码中用 Java 创建和初始化 `HashMap<Date, Date>` 的最佳方法?