scala - 应用程序中的 Spark 调度 : performance issue

标签 scala apache-spark apache-spark-sql spark-streaming databricks

我像这样实现了 Apache Spark 调度(Scala 代码):

// group into list of 10 items...
val maxSimultaneousSubmitAndMonitorThreadsInDriver = 10

// ... in order to throttle the number of threads submitting and monitoring apps at a time
val lists = myList grouped maxSimultaneousThreadsInDriver 

for (aList <- lists) {

   // pick a list, then convert it to Scala Parallel list
   aList.par.foreach { // so 10 threads MAX at a time, that can handle job submission and monitoring
      case (file_name) => {

        // in each driver thread, create different Spark session
        val sparkChild = sparkMain.newSession()

        // then do specific stuff with such session
        val childDF = sparkChild.read.parquet( filename + "_directory/*.parquet") 
        ...
     }
   }

}

正如您所知,调度的概念在这样的单个驱动程序实例中可以监控多个 Spark 应用程序。所以我可以同时运行多个 Spark 应用程序 . (在我的情况下,每个 Spark 应用程序都可以根据名称,根据业务规则,对读取的每个文件执行非常具体的任务)。

调度程序默认配置为 FIFO 模式:

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away [...]



这样的解决方案对我有用。 但是我发现 Spark 调度有点慢 .例如,当我看到 Spark UI Executors 选项卡时,我可以看到大部分时间只使用了几个核心。

With Spark Scheduling Within mode, cluster resources seems clearly underutilized

这与我拥有的几乎所有时间自然完全消耗 CPU 的经典 Spark 应用程序相反!

所以我的最后一个问题是,如何优化 Spark Scheduling Within 的性能?

我试过的:
  • 更改 maxSimultaneousSubmitAndMonitorThreadsInDriver ,为了限制在给定时间提交和监控应用程序的线程数
  • 试图增加 spark.scheduler.listenerbus.eventqueue.capacity
  • 试图增加/减少spark.default.parallelism
  • 试图增加/减少spark.sql.shuffle.partitions

  • 如果我增加可以同时提交和监控 Spark 应用程序的线程数(使用节流系统),我最终会出现 OOM。

    关于spark.default.parallelismspark.sql.shuffle.partitions ,我不知道如何选择相关值。如果我不进行调度(每个驱动程序只有一个应用程序),我设置的值可能是 192(内核数)以获得良好的结果。

    但是对于Scheduling Within,尚不清楚。每个提交的作业都很小,每个作业的并行度 192 似乎有点矫枉过正(而且很慢?)。

    任何投入将不胜感激

    最佳答案

    首先,您定义 maxSimultaneousSubmitAndMonitorThreadsInDriver=10然后使用 maxSimultaneousThreadsInDriver而不是你刚刚宣布的那个,这是故意的吗?

    其次,尝试删除行val sparkChild = sparkMain.newSession()并将下一行更改为 val childDF = sparkMain.read.parquet( filename + "_directory/*.parquet")相反,它编译吗?如果它确实保持这种方式并再次检查。

    您是否尝试增加执行者数量?
    如果参数已存在于您的 spark-submit 中,请添加或更改 --num-executors 20 , 如果通过代码创建上下文添加 conf.set("spark.executor.instances", 20)就在 new SparkContext(conf) 之前在你的代码行。
    现在再次运行,它会提高性能吗?如果是但不够增加到 40。
    如果您仍然卡住,请继续阅读。

    默认的 Spark 作业运行行为是 FIFO,即第一个作业将被优先处理,只有在有可用资源时才执行后面的作业 第一个作业释放资源。
    我猜你只得到 14 个任务(每个执行程序 7 个),因为你的文件非常小,如果任务运行得非常快,那么重新分区不会解决问题,但允许并行作业会。
    由于您正在寻找作业之间的并行性,我建议您使用 FAIR 调度程序并为您创建的每个线程/作业分配不同的池。

    通过添加到您的 spark-submit --conf spark.scheduler.mode=FAIR 为您的 spark 应用程序配置 FAIR 共享, 如果通过代码创建上下文添加 conf.set("spark.scheduler.mode", FAIR)就在 new SparkContext(conf) 之前在你的代码行。

    在线程内执行任何作业之前分配随机池名称(您可以使用线程 ID,但即使对于相同的线程,建议为每个作业使用不同的池名称):

    val randomString = scala.util.Random.alphanumeric.take(10).mkString("")
    sparkMaster.setLocalProperty("spark.scheduler.pool", randomString)
    val childDF = sparkMaster.read.parquet( filename + "_directory/*.parquet") 
    

    现在 FAIR 共享应该启动并在线程之间平均分配资源。
    如果您仍然看到较低的内核使用率,请尝试在不针对 OOM 的情况下将最大线程池容量最大化。
    如果它仍然很慢,请考虑重新分区到 (max_cores / max_threads) ,在您的情况下(看到 2 个具有 192 个可用内核的执行程序,即总共 384 384/10=38 ,因此 repartition(38) 可能会有所帮助。

    引用:https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

    关于scala - 应用程序中的 Spark 调度 : performance issue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60480033/

    相关文章:

    scala - 现有答案的柯里化(Currying)函数无效

    scala - 使用 Doobie 在单个事务中进行多个查询?

    apache-spark - spark datasax cassandra 连接器从沉重的 cassandra 表读取速度慢

    maven - Spark Streaming + json4s-jackson 依赖问题

    scala - 从数据框 Spark 中删除一列

    performance - 如何知道哪个计数查询最快?

    postgresql - 使用 Anorm 执行更新返回 PSQLException : The column index is out of range: 2, 列数:1

    python 2.7 : create dictionary from list of sets

    apache-spark - pyspark.sql.utils.AnalysisException : Column ambiguous but no duplicate column names

    scala - Databricks SCALA UDF 在注册函数时无法加载类