我有以下代码在大多数情况下会触发 hiveContext.sql()
。我的任务是我想创建几个表并在处理完所有配置单元表分区后将值插入。
所以我首先触发 show partitions
并在 for 循环中使用它的输出,我调用了一些方法来创建表(如果它不存在)并使用 hiveContext.sql
。
现在,我们不能在执行器中执行hiveContext
,所以我必须在驱动程序的for循环中执行它,并且应该一个接一个地串行运行。当我在 YARN 集群中提交此 Spark 作业时,几乎所有时间我的执行程序都因为未找到 shuffle 异常而丢失。
现在发生这种情况是因为 YARN 由于内存过载而杀死了我的执行程序。我不明白为什么,因为我为每个配置单元分区都有一个非常小的数据集,但它仍然会导致 YARN 杀死我的执行程序。
以下代码是否会并行执行所有操作并尝试同时容纳内存中的所有 hive 分区数据?
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")");
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String server = splitArr[0].split("=")[1];
String date = splitArr[1].split("=")[1];
String csvPath = "hdfs:///user/db/ext/"+server+".csv";
if(fs.exists(new Path(csvPath))) {
hiveContext.sql("ADD FILE " + csvPath);
}
createInsertIntoTableABC(hc,entity, date);
createInsertIntoTableDEF(hc,entity, date);
createInsertIntoTableGHI(hc,entity,date);
createInsertIntoTableJKL(hc,entity, date);
createInsertIntoTableMNO(hc,entity,date);
}
}
最佳答案
通常,您应该始终深入日志以找出真正的异常(至少在 Spark 1.3.1 中)。
tl;博士
Yarn 下 Spark 的安全配置
spark.shuffle.memoryFraction=0.5
- 这将允许 shuffle 使用更多分配的内存
spark.yarn.executor.memoryOverhead=1024
- 以 MB 为单位。当 Yarn 的内存使用量大于(executor-memory + executor.memoryOverhead)时, yarn 会杀死执行者
更多信息
通过阅读您的问题,您提到您得到 shuffle not found 异常。
如果发生
org.apache.spark.shuffle.MetadataFetchFailedException:缺少 shuffle 的输出位置
您应该增加 spark.shuffle.memoryFraction
,例如增加到 0.5
Yarn 杀死我的执行程序的最常见原因是内存使用超出预期。
为避免您增加 spark.yarn.executor.memoryOverhead
,我将其设置为 1024,即使我的执行程序仅使用 2-3G 内存。
关于memory - 如何避免 Spark executor 丢失以及由于内存限制而导致 yarn 容器杀死它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31840492/