java - 如何将Spark Client SubmitApplication转换为Yarn Rest API?

标签 java rest apache-spark hadoop yarn

目前,我有一个使用 spark.deploy.yarn.Client 向Yarn提交应用程序的工作代码实现。聚合此客户端所需的所有参数很复杂,但是提交应用程序很简单:

ClientArguments cArgs = new ClientArguments(args.toArray(new String[0]));
client = new Client(cArgs, sparkConf);
applicationID = client.submitApplication();

在此之前,大多数代码都在累积 sparkConf args 。现在,我希望退休客户端并仅与Rest一起使用。 Spark提供了完整的REST api,包括提交应用程序-根据Spark documentation,这是这个简单的json / xml帖子的问题:
POST http://<rm http address:port>/ws/v1/cluster/apps
Accept: application/json
Content-Type: application/json
{
  "application-id":"application_1404203615263_0001",
  "application-name":"test",
  "am-container-spec":
{
  "local-resources":
  {
    "entry":
    [
      {
        "key":"AppMaster.jar",
        "value":
        {
          "resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
          "type":"FILE",
          "visibility":"APPLICATION",
          "size": 43004,
          "timestamp": 1405452071209
        }
      }
    ]
  },
  "commands":
  {
    "command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
  },
  "environment":
  {
    "entry":
    [
      {
        "key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
        "value": "1405459400754"
      },
      {
        "key": "CLASSPATH",
        "value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
      },
      {
        "key": "DISTRIBUTEDSHELLSCRIPTLEN",
        "value": "6"
      },
      {
        "key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
        "value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
      }
    ]
  }
},
"unmanaged-AM":false,
"max-app-attempts":2,
"resource":
{
  "memory":1024,
  "vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false,
"log-aggregation-context":
{
  "log-include-pattern":"file1",
  "log-exclude-pattern":"file2",
  "rolled-log-include-pattern":"file3",
  "rolled-log-exclude-pattern":"file4",
  "log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
  "log-aggregation-policy-parameters":""
},
"attempt-failures-validity-interval":3600000,
"reservation-id":"reservation_1454114874_1",
"am-black-listing-requests":
{
  "am-black-listing-enabled":true,
  "disable-failure-threshold":0.01
}
}

我试图将参数转换为POST请求的JSON主体,但这似乎是不可能的。有人知道我是否可以对我提交的JSON有效负载通过REST发送的正在运行的应用程序进行逆向工程吗?或者我可以使用什么映射来获取Client参数并将其放在JSON中?

最佳答案

经过一番搜索,我设法仅通过REST API提交了一个应用程序。这不是一个有据可查的过程,所以我在这里发布。

注意:如果您随时希望将请求的内容与客户端发送的请求进行比较,请使用调试断点检查客户端使用的应用程序上下文。
打开类org.apache.hadoop.yarn.client.api.impl.YarnClientImpl并转到方法submitApplication(ApplicationSubmissionContext appContext)

首先,要用REST API请求替换spark.deploy.yarn.Client,解决方案必须确保配置中提到的所有文件在HDFS上都可用。
稍后,它需要编写并上传一个名为__spark_conf__.zip的额外文件。

步骤1

翻阅SparkConf(Client的第二个参数)中的文件:“AllJars”标记中提到的文件,“mainJarPath”中提到的文件以及“FilesList”中提到的文件。

对于每个文件,检查它是否存在于HDFS中;如果不存在,请从本地计算机上载它。对于每个文件,从HDFS中获取其FileStatus
聚合资源列表,这是包含以下六个属性的每个文件的属性映射:

  • 大小= getLen()
  • 时间戳= getModificationTime()
  • type = FILE
  • 能见度= PUBLIC

  • 另外两个属性:键和资源。
  • 来自allJars列表的文件:密钥为 spark_libs / {{filename}},资源为文件名。
  • FilesList中的文件:键是“localEntry”标签,资源是“hdfsPath”标签。
  • mainJarPath中的File:键是“ app .jar”,资源是文件名。

  • 步骤2

    创建__spark_conf__.zip文件。您可以直接在hdfs中的登台路径(通常为{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip)中创建它。
    此存档文件包含两个文件和一个空目录:一个文件__spark_hadoop_conf__.xml(重命名为core-site.xml),另一个文件称为__spark_conf__.properties,这是一个经过稍微修改的版本
    配置中的sparkConf部分。

    要创建__spark_conf__.properties,您需要从“sparkConf”->“org $ apache $ spark $ SparkConf $$ settings”中读取JSON映射,并转换JSON格式“spark.safemine.addcontrol.driverMemory”中的每一行:“5120M ”
    到spark.safemine.addcontrol.driverMemory = 5120M

    在文件底部,添加6行:
  • spark.yarn.cache.confArchive = {{您在sparkStaging中将__spark_conf__.zip上传到的位置}}
  • spark.yarn.cache.visibilities = {{文件的所有可见性,以逗号分隔-基本上是“PUBLIC,PUBLIC,...,PUBLIC”}}}
  • spark.yarn.cache.timestamps = {{文件的所有时间戳,以逗号分隔}}
  • spark.yarn.cache.types = {{所有文件类型,以逗号分隔-基本上是“FILE,FILE,...,FILE”}}}
  • spark.yarn.cache.filenames = {{记录为resource#key和逗号分隔的所有文件名和键}}
  • spark.yarn.cache.sizes = {{文件的所有大小,以逗号分隔}}

  • 确保按相应顺序编译5条汇总行。我使用以下代码:
        String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
        String filenames = "spark.yarn.cache.filenames=";
        String sizes = "spark.yarn.cache.sizes=";
        String timestamps = "spark.yarn.cache.timestamps=";
        String types = "spark.yarn.cache.types=";
        String visibilities = "spark.yarn.cache.visibilities=";
        for (Map<String,String> localResource:localResources) {
            filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
            sizes+=localResource.get("size")+",";
            timestamps+=localResource.get("timestamp")+",";
            types+=localResource.get("type")+",";
            visibilities+=localResource.get("visibility")+",";
    
        }
        properties+=confArchive+"\n";
        properties+=filenames.substring(0,filenames.length()-1)+"\n";
        properties+=sizes.substring(0,sizes.length()-1)+"\n";
        properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
        properties+=types.substring(0,types.length()-1)+"\n";
        properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
    
    __spark_hadoop_conf__.xml文件是core-site.xml的简单重命名,使用它们创建的文件夹名为__hadoop_conf__,并且为空。

    您可以将文件直接保存到hdfs,如下所示:
    private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
        String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
        Path hdfsPath = new Path(path);
        ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
        os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
        os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
        os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
        os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
        os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
        os.close();
    }
    

    创建完文件后,按照以下说明将其添加到资源列表中:
  • 大小= getLen()
  • 时间戳= getModificationTime()
  • 类型=存档
  • 可见性=私有(private)
  • 键= __spark_conf__
  • 资源是登台目录(通常为{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip)。

  • 遍历完整的资源列表,并使用我们在{{}}占位符中收集的值,为每个资源使用此结构创建XML / JSON:
        <entry>
            <key>{{key}}</key>
            <value>
                <resource>{{resource}}</resource>
                <size>{{size}}</size>
                <timestamp>{{timestamp}}</timestamp>
                <type>{{type}}</type>
                <visibility>{{visibility}}</visibility>
            </value>
        </entry>
    

    累积的字符串将是您的localResources XML段,如下所示。

    步骤3

    生成Java命令。您将需要从SparkConfig中提取一些元素:
  • driverMemory-来自sparkConf
  • 中的相同属性
  • extraJavaOptions =来自属性集合
  • 中的spark.driver.extraJavaOptions
  • mainClass-来自sparkConf
  • 中的相同属性
  • argstr-收集除--class之外的所有ClientArgs

  • 包含元素的结果命令为:
    String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=&lt;LOG_DIR&gt; "
                + "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
                + "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1&gt; &lt;LOG_DIR&gt;/stdout 2&gt; &lt;LOG_DIR&gt;/stderr";
    

    步骤4

    编译请求XML。

    注意:我的实现在AM容器上需要标签,因此添加了am-container-node-label-expression。这并非在所有情况下都适用。

    从sparkConf到REST请求的映射是(此处显示为XML,还支持JSON实现):
    <application-submission-context>
        <application-id>"+applicationId+"</application-id> 
        <application-name>"+appName+"</application-name>
        <queue>default</queue>
        <priority>0</priority>
        <am-container-spec>
           <local-resources>+localResources+</local-resources>
           <environment>
              <entry>
                 <key>SPARK_YARN_STAGING_DIR</key>
                 <value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
              </entry>
              <entry>
                 <key>CLASSPATH</key>
                 <value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
              </entry>
              <entry>
                 <key>SPARK_USER</key>
                 <value>"+userName+"</value>
              </entry>
           </environment>
           <commands>
              <command>"+command+"</command>
           </commands>
        </am-container-spec>
        <unmanaged-AM>false</unmanaged-AM>
        <max-app-attempts>1</max-app-attempts>
        <resource>
          <memory>5632</memory>
          <vCores>1</vCores>
        </resource>
        <application-type>SPARK</application-type>
        <keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
        <application-tags>
          <tag>"+sparkYarnTag+"</tag>
        </application-tags>
        <am-container-node-label-expression>appMngr</am-container-node-label-expression>
        <log-aggregation-context/>
        <attempt-failures-validity-interval>1</attempt-failures-validity-interval>
        <reservation-id/>
    </application-submission-context> 
    

    步骤5:

    通过REST http PUT提交应用程序:
    private void submitApplication (String body, String userName) throws SMSparkManagerException {
        HttpClient client = HttpClientBuilder.create().build();
        HttpPost request = new HttpPost(uri+"?user.name="+userName);
        try {
            request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
            HttpResponse response = client.execute(request);
            if (response.getStatusLine().getStatusCode()!=202) {
                throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
            }
        } catch (UnsupportedEncodingException e) {
            logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
            throw new SMSparkManagerException("Error in submitting application to yarn");
        } catch (ClientProtocolException e) {
            logger.error("The application Could not be submitted due to ClientProtocolException", e);
            throw new SMSparkManagerException("Error in submitting application to yarn");
        } catch (IOException e) {
            logger.error("The application Could not be submitted due to IOException", e);
            throw new SMSparkManagerException("Error in submitting application to yarn");
        }
    }
    

    关于java - 如何将Spark Client SubmitApplication转换为Yarn Rest API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53691069/

    相关文章:

    rest - Marketo Rest API - 使用 Id 更新潜在客户

    http - 我应该这样使用 Content-Location header 吗?

    apache-spark - 使用 Spark SQL Streaming 时缺少 Avro 自定义 header

    apache-spark - 无法使用SparkSQL在Hive中写入数据

    java - 如何在运行时知道占用的堆大小

    java - Android 应用程序无法看到通过 JSP 创建的数据存储实体

    rest - restTemplate.exchange()方法有什么作用?

    scala - 如何从 HDFS 获取目录名称

    java - 在play框架scala中获取数据

    java - 如何将指定数量的 JPanel 添加到 JFrame 中?