scala - YARN 因超出内存限制而杀死容器

标签 scala apache-spark apache-spark-sql emr amazon-emr

我遇到了 YARN 因超出内存限制而杀死我的容器的问题:

Container killed by YARN for exceeding memory limits. physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

我有 20 个 m3.2xlarge 节点,所以它们有:
cores: 8
memory: 30
storage: 200 gb ebs

我的应用程序的要点是,我有几个 100k Assets ,我有去年每小时生成的历史数据,未压缩的数据集总大小为 2TB。我需要使用这些历史数据为每个 Assets 生成预测。我的设置是我首先使用 s3distcp 将存储为索引 lzo 文件的数据移动到 hdfs。然后我将数据拉入并将其传递给 sparkSql 来处理 json:
 val files = sc.newAPIHadoopFile("hdfs:///local/*",
  classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text],conf)
val lzoRDD = files.map(_._2.toString)
val data = sqlContext.read.json(lzoRDD)

然后我使用 groupBy 按 Assets 对历史数据进行分组,创建一个 (assetId,timestamp,sparkSqlRow) 的元组。我认为在生成每个 Assets 的预测时,这种数据结构可以更好地进行内存操作。
 val p = data.map(asset =>  (asset.getAs[String]("assetId"),asset.getAs[Long]("timestamp"),asset)).groupBy(_._1)

然后我使用 foreach 迭代每一行,计算预测,最后将预测作为 json 文件写回 s3。
 p.foreach{ asset =>
  (1 to dateTimeRange.toStandardHours.getHours).foreach { hour =>
    // determine the hour from the previous year
    val hourFromPreviousYear = (currentHour + hour.hour) - timeRange
    // convert to seconds
    val timeToCompare = hourFromPreviousYear.getMillis
    val al = asset._2.toList

    println(s"Working on asset ${asset._1} for hour $hour with time-to-compare: $timeToCompare")
    // calculate the year over year average for the asset
    val yoy = calculateYOYforAsset2(al, currentHour, asset._1)
    // get the historical data for the asset from the previous year
    val pa = asset._2.filter(_._2 == timeToCompare)
      .map(row => calculateForecast(yoy, row._3, asset._1, (currentHour + hour.hour).getMillis))
      .foreach(json => writeToS3(json, asset._1, (currentHour + hour.hour).getMillis))
  }
}
  • 有没有更好的方法来实现这一点,这样我就不会遇到 YARN 的内存问题?
  • 有没有办法对 Assets 进行分块,以便 foreach 一次只处理大约 10k 项 Assets ,而不是所有 200k 项 Assets ?

  • 任何建议/帮助表示赞赏!

    最佳答案

    它不是你的代码。别担心 foreach不会同时运行所有这些 lambda。问题是 Spark 的默认值是 spark.yarn.executor.memoryOverhead (或最近在 2.3+ 中更名为 spark.executor.memoryOverhead )过于保守,这会导致您的执行程序在负载下被杀死。

    解决方案是(如错误消息所建议的)增加该值。如果您为每个执行程序请求大量内存,我会首先将其设置为 1GB(设置为 1024)或更多。目标是在不杀死任何执行程序的情况下运行作业。

    或者,如果您控制集群,则可以通过设置配置 yarn.nodemanager.pmem-check-enabled 来禁用 YARN 内存强制执行。和 yarn.nodemanager.vmem-check-enabledfalseyarn-site.xml

    关于scala - YARN 因超出内存限制而杀死容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36939480/

    相关文章:

    Scala - 如何为逆变类定义工厂方法?

    apache-spark - 在 Spark 中执行 DataFrame 自连接的最简洁、最高效的语法

    sql - Spark Scala : Getting Cumulative Sum (Running Total) Using Analytical Functions

    apache-spark - 调用 LogisticRegressionModelWithLBFGS.train 时出现 Py4JavaError

    hadoop - Spark 中的任何 Oozie 等效功能

    scala - 无法从Sqoop创建的Spark中的序列文件创建数据框

    apache-spark-sql - Spark-sql Insert OVERWRITE追加数据而不是覆盖

    scala - TDD Scala 教程

    scala - 在 Scala 中保持推导的更高类型

    scala - 并行遍历文件中的行(Scala)?