apache-spark - 通过Web应用程序启动Spark应用程序的最佳实践?

标签 apache-spark

我想通过Web应用程序向用户公开我的Spark应用程序。

基本上,用户可以决定要运行的 Action 并输入一些变量,这些变量需要传递给spark应用程序。
例如:用户输入一些字段,然后单击执行以下“使用参数min_x,max_x,min_y,max_y运行sparkApp1”的按钮。

应该使用用户指定的参数启动spark应用程序。完成后,可能需要Web应用程序检索结果(从hdfs或mongodb)并将其显示给用户。在处理过程中,Web应用程序应显示Spark应用程序的状态。

我的问题:

  • Web应用程序如何启动Spark应用程序?它可能能够从幕后的命令行启动它,但是可能会有更好的方法来执行此操作。
  • Web应用程序如何访问Spark应用程序的当前状态?从Spark WebUI的REST API获取状态是否可行?

  • 我正在运行带有YARN / Mesos(尚不确定)和MongoDB的Spark 1.6.1集群。

    最佳答案

    非常基本的答案:

    基本上,您可以使用SparkLauncher类启动Spark应用程序,并添加一些侦听器以监视进度。

    但是,您可能对Livy服务器感兴趣,该服务器是用于Spark作业的RESTful服务器。据我所知,齐柏林飞艇正在使用Livy提交工作并获取状态。

    您还可以使用Spark REST界面检查状态,信息将更加精确。 Here有一个示例如何通过REST API提交作业

    您有3个选择,答案是-自己检查;)这很大程度上取决于您的项目和要求。这两个主要选项:

  • SparkLauncher + Spark REST接口(interface)
  • Livy服务器

  • 应该对您有好处,您必须检查一下在项目中更容易使用和更好使用的功能

    扩展答案

    您可以根据需要和喜好以不同方式在应用程序中使用Spark。

    SparkLauncher

    SparkLauncherspark-launcher Artifact 中的类。就像从Spark Submit中一样,它用于启动已经准备好的Spark作业。

    典型用法是:

    1)使用Spark作业构建项目并将JAR文件复制到所有节点
    2)在您的客户端应用程序(即Web应用程序)中,创建指向准备好的JAR文件的SparkLauncher
    SparkAppHandle handle = new SparkLauncher()
        .setSparkHome(SPARK_HOME)
        .setJavaHome(JAVA_HOME)
        .setAppResource(pathToJARFile)
        .setMainClass(MainClassFromJarWithJob)
        .setMaster("MasterAddress
        .startApplication();
        // or: .launch().waitFor()
    
    startApplication创建SparkAppHandle,使您可以添加侦听器并停止应用程序。它还为getAppId提供了可能性。

    SparkLauncher应该与Spark REST API一起使用。您可以查询http://driverNode:4040/api/v1/applications/*ResultFromGetAppId*/jobs,您将获得有关应用程序当前状态的信息。

    Spark REST API

    也可以直接通过RESTful API提交Spark作业。用法与SparkLauncher非常相似,但是它是以纯RESTful方式完成的。

    请求示例-本文的鸣谢:
    curl -X POST http://spark-master-host:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{
      "action" : "CreateSubmissionRequest",
      "appArgs" : [ "myAppArgument1" ],
      "appResource" : "hdfs:///filepath/spark-job-1.0.jar",
      "clientSparkVersion" : "1.5.0",
      "environmentVariables" : {
        "SPARK_ENV_LOADED" : "1"
      },
      "mainClass" : "spark.ExampleJobInPreparedJar",
      "sparkProperties" : {
        "spark.jars" : "hdfs:///filepath/spark-job-1.0.jar",
        "spark.driver.supervise" : "false",
        "spark.app.name" : "ExampleJobInPreparedJar",
        "spark.eventLog.enabled": "true",
        "spark.submit.deployMode" : "cluster",
        "spark.master" : "spark://spark-cluster-ip:6066"
      }
    }'
    

    此命令将提交ExampleJobInPreparedJar类中的作业以与给定的Spark Master群集。在响应中,您将拥有submissionId字段,这将有助于检查应用程序的状态-只需调用另一个服务:curl http://spark-cluster-ip:6066/v1/submissions/status/submissionIdFromResponse即可。就是这样,仅需编写代码

    Livy REST服务器和Spark作业服务器

    Livy REST ServerSpark Job Server是RESTful应用程序,允许您通过RESTful Web Service提交作业。两者与Spark的REST接口(interface)之间的主要区别是Livy和SJS不需要更早地准备作业并将其打包到JAR文件中。您只是提交将在Spark中执行的代码。

    用法很简单。代码取自Livy存储库,但进行了一些削减以提高可读性

    1)情况1:提交作业,该作业放置在本地计算机中
    // creating client
    LivyClient client = new LivyClientBuilder()
      .setURI(new URI(livyUrl))
      .build();
    
    try {
      // sending and submitting JAR file
      client.uploadJar(new File(piJar)).get();
      // PiJob is a class that implements Livy's Job
      double pi = client.submit(new PiJob(samples)).get();
    } finally {
      client.stop(true);
    }
    

    2)情况2:动态作业创建和执行
    // example in Python. Data contains code in Scala, that will be executed in Spark
    data = {
      'code': textwrap.dedent("""\
        val NUM_SAMPLES = 100000;
        val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
          val x = Math.random();
          val y = Math.random();
          if (x*x + y*y < 1) 1 else 0
        }.reduce(_ + _);
        println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
        """)
    }
    
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    pprint.pprint(r.json()) 
    

    如您所见,既可以预编译作业,也可以对Spark进行临时查询。

    水圈雾

    另一个Spark即服务应用程序。 Mist非常简单,类似于Livy和Spark Job Server。

    用法非常非常相似

    1)创建作业文件:
    import io.hydrosphere.mist.MistJob
    
    object MyCoolMistJob extends MistJob {
        def doStuff(parameters: Map[String, Any]): Map[String, Any] = {
            val rdd = context.parallelize()
            ...
            return result.asInstance[Map[String, Any]]
        }
    } 
    

    2)将作业文件打包到JAR中
    3)向Mist发送请求:
    curl --header "Content-Type: application/json" -X POST http://mist_http_host:mist_http_port/jobs --data '{"path": "/path_to_jar/mist_examples.jar", "className": "SimpleContext$", "parameters": {"digits": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]}, "namespace": "foo"}'
    

    我在Mist中可以看到的一件事是,它具有通过MQTT对流作业的开箱即用的支持。

    Apache ·托瑞(Apache Toree)

    创建Apache Toree是为了使Spark易于交互分析。它不需要构建任何JAR。它通过IPython协议(protocol)运行,但不仅支持Python。

    当前的文档主要集中在Jupyter笔记本支持上,但是也有REST风格的API。

    比较和结论

    我列出了一些选择:
  • Spark启动器
  • Spark REST API
  • Livy REST服务器和Spark作业服务器
  • 水圈雾
  • Apache Toree

  • 它们都适用于不同的用例。我可以区分几个类别:
  • 需要作业的JAR文件的工具:Spark Launcher,Spark REST API
  • 用于交互式和预打包作业的​​工具:Livy,SJS,Mist
  • 专注于交互式分析的工具:Toree(但是可能对预打包的作业有一些支持;目前尚未发布任何文档)

  • SparkLauncher非常简单,是Spark项目的一部分。您正在用纯代码编写作业配置,因此它比JSON对象更容易构建。

    对于完全RESTful样式的提交,请考虑Spark REST API,Livy,SJS和Mist。其中三个是稳定的项目,其中有一些生产用例。 REST API还要求将作业预先打包,而Livy和SJS则不需要。但是请记住,默认情况下,每个Spark发行版中都包含Spark REST API,而Livy / SJS中则没有。我对Mist不太了解,但是-一段时间后-它应该是集成所有类型的Spark作业的非常好的工具。

    Toree专注于交互式工作。它仍在孵化中,但是即使现在您仍可以检查它的可能性。

    如果内置REST API,为什么还要使用自定义的附加REST服务?像Livy这样的SaaS是Spark的一个切入点。它管理Spark上下文,并且仅在一个节点上,而在群集之外的其他地方。它们还支持交互式分析。 Apache Zeppelin使用Livy将用户代码提交给Spark

    关于apache-spark - 通过Web应用程序启动Spark应用程序的最佳实践?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40300222/

    相关文章:

    scala - 值 toDS 不是 org.apache.spark.rdd.RDD 的成员

    scala - Spark -hbase-连接器 : ClusterId read in ZooKeeper is null

    apache-spark - Spark 的客户端模式下,驱动需要网络访问远程执行器?

    scala - 为什么提交作业失败并返回 "NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;"?

    hadoop - yarn 上的 Spark 提交 - 多项工作

    hadoop - 执行 spark 作业时出现 FileNotFoundException

    java - 如何在不知道数据模式的情况下将数据从文本文件加载到 spark 数据框中?

    python - 创建一个包含每个文件的架构数据的数据框

    java - Spark在java中提交(SparkLauncher)

    scala - 错误 : value += is not a member of Long Scala