我正在 EMR 中运行 Spark 结构化流作业(每天弹跳)。执行几个小时后,我的应用程序出现 OOM 错误并被杀死。以下是我的配置和spark SQL代码。 我是 Spark 新手,需要您的宝贵意见。
EMR 有 10 个实例,具有 16 核和 64GB 内存。
Spark-提交参数:
num_of_executors: 17
executor_cores: 5
executor_memory: 19G
driver_memory: 30G
作业正在以 30 秒的间隔从 Kafka 中以微批处理的形式读取输入。每批处理平均读取行数为 90k。
spark.streaming.kafka.maxRatePerPartition: 4500
spark.streaming.stopGracefullyOnShutdown: true
spark.streaming.unpersist: true
spark.streaming.kafka.consumer.cache.enabled: true
spark.hadoop.fs.s3.maxRetries: 30
spark.sql.shuffle.partitions: 2001
Spark SQL聚合代码:
dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
.agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
.select(NAME,DEPS)
.map((row) -> {
Map<String, Object> map = Maps.newHashMap();
map.put(NAME, row.getString(0));
map.put(DEPS, row.getString(1));
return new KryoMapSerializationService().serialize(map);
}, Encoders.BINARY());
来自驱动程序的一些日志:
20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host: <host> (state: COMPLETE, exit status: 143)
顺便说一下,我在 forEachBatch 代码中使用了collectasList
List<Event> list = dataset.select("value")
.selectExpr("deserialize(value) as rows")
.select("rows.*")
.selectExpr(NAME, DEPS)
.as(Encoders.bean(Event.class))
.collectAsList();
最佳答案
使用这些设置,您可能会导致自己的问题。
num_of_executors: 17
executor_cores: 5
executor_memory: 19G
driver_memory: 30G
您基本上是在此处创建额外的容器,以便在它们之间进行洗牌。相反,从 10 个执行程序、15 个内核、60g 内存等开始。如果这有效,那么您可以尝试一下这些以尝试优化性能。我通常尝试将每个步骤将容器分成两半(但自 Spark 2.0 以来我也不需要这样做)。
让 Spark SQL 保持默认值 200。您分解得越多,Spark 计算洗牌的数学就越多。如果有的话,我会尝试使用与执行器相同数量的并行度,因此在本例中只有 10 个。当 2.0 发布时,这就是调整 Hive 查询的方式。 使工作变得复杂而分解将所有的负担都压在了主人身上。
使用数据集和编码通常也不如直接使用 DataFrame 操作那么高效。我发现将数据帧操作分解出来可以显着提高性能。
关于java - 如何确定Spark中shuffle分区的最佳数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61029267/