java - 如何确定Spark中shuffle分区的最佳数量

标签 java apache-spark apache-spark-sql hadoop-yarn spark-structured-streaming

我正在 EMR 中运行 Spark 结构化流作业(每天弹跳)。执行几个小时后,我的应用程序出现 OOM 错误并被杀死。以下是我的配置和spark SQL代码。 我是 Spark 新手,需要您的宝贵意见。

EMR 有 10 个实例,具有 16 核和 64GB 内存。

Spark-提交参数:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

作业正在以 30 秒的间隔从 Kafka 中以微批处理的形式读取输入。每批处理平均读取行数为 90k。

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Spark SQL聚合代码:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> {
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            }, Encoders.BINARY());

来自驱动程序的一些日志:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

顺便说一下,我在 forEachBatch 代码中使用了collectasList

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();

最佳答案

使用这些设置,您可能会导致自己的问题。

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

您基本上是在此处创建额外的容器,以便在它们之间进行洗牌。相反,从 10 个执行程序、15 个内核、60g 内存等开始。如果这有效,那么您可以尝试一下这些以尝试优化性能。我通常尝试将每个步骤将容器分成两半(但自 Spark 2.0 以来我也不需要这样做)。

让 Spark SQL 保持默认值 200。您分解得越多,Spark 计算洗牌的数学就越多。如果有的话,我会尝试使用与执行器相同数量的并行度,因此在本例中只有 10 个。当 2.0 发布时,这就是调整 Hive 查询的方式。 使工作变得复杂而分解将所有的负担都压在了主人身上。

使用数据集和编码通常也不如直接使用 DataFrame 操作那么高效。我发现将数据帧操作分解出来可以显着提高性能。

关于java - 如何确定Spark中shuffle分区的最佳数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61029267/

相关文章:

java - 带有构造函数和方法的嵌套枚举?

java - 使用 native Mac OS X 菜单栏时出现 JMenuBar 问题

postgresql - Spark 流式传输多个源,重新加载数据帧

scala - 如何将数据帧转换为 JSON 并使用 key 写入 kafka 主题

mysql - Spark SQL 数据仓库

java - tomcat请求需要半天多,还在工作

java - 从键盘打印 switch case 语句

java - Spark Java 累加器不递增

scala - Spark:加入数组

mysql - 在 spark 中使用 JDBC 驱动程序限制与 MySQL 数据库的连接数