java - 在spark-jobserver上运行基于Java的Spark作业

标签 java scala apache-spark spark-jobserver

我需要使用 spark-jobserver 使用低延迟上下文运行聚合 Spark 作业。我有这个 Scala 运行程序来使用 Java 类中的 Java 方法来运行作业。

object AggregationRunner extends SparkJob {
  def main(args: Array[String]) {
    val ctx = new SparkContext("local[4]", "spark-jobs")
    val config = ConfigFactory.parseString("")
    val results = runJob(ctx, config)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
    SparkJobValid;
  }

  override def runJob(sc: SparkContext, config: Config): Any = {
    val context = new JavaSparkContext(sc)
    val aggJob = new ServerAggregationJob()
    val id = config.getString("input.string").split(" ")(0)
    val field = config.getString("input.string").split(" ")(1)
    return aggJob.aggregate(context, id, field)
  }
}

但是,我收到以下错误。我尝试取出Java方法中返回的内容,现在只是返回一个测试字符串,但它仍然不起作用:

{
  "status": "ERROR",
  "result": {
    "message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/single-context#1243999360]] after [10000 ms]",
    "errorClass": "akka.pattern.AskTimeoutException",
    "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)", "akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)", "scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)", "akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)", "akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)", "akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)", "java.lang.Thread.run(Thread.java:745)"]
  }
}

我不太确定为什么会超时,因为我只返回一个字符串。

编辑

所以我发现问题的发生是因为我使用的是在更新 JAR 之前创建的 Spark 上下文。但是,现在我尝试在 Spark 作业中使用 JavaSparkContext,它返回到上面显示的错误。

什么是消除错误的永久方法。

此外,我在本地 Docker 容器上运行繁重的 Spark 作业是否是超时的合理原因?

最佳答案

要解决询问超时问题,请在作业服务器配置文件中添加/更改以下属性。

spray.can.server {
idle-timeout = 210 s
request-timeout = 200 s
}

有关更多信息,请查看此 https://github.com/spark-jobserver/spark-jobserver/blob/d1843cbca8e0d07f238cc664709e73bbeea05f2c/doc/troubleshooting.md

关于java - 在spark-jobserver上运行基于Java的Spark作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35928508/

相关文章:

java - 使用Spark从Hadoop读取JSON文件

java - 我可以在 Intellij Idea 中通过 @Around 方面进行调试吗

scala - 如何使用系统属性替换 Typesafe Config 文件中的占位符?

scala - Scala-ScheduledFuture

scala - 使用 Apache Spark 提取 kmeans 集群信息

scala - Spark on HDInsights - 方案 : adl 没有文件系统

java - 通过时间戳创建,让数据库管理它更好,还是让应用程序管理更好?

java - 代码中的类不变量

java - 使用 BIRT 创建报告

scala - SBT:在编译期间查看类文件摘要?