apache-spark - 在循环内使用 sparkDF.write.saveAsTable() 会导致作业之间的延迟呈指数增长

标签 apache-spark hive pyspark

我需要在 for 循环中执行一组不同的 hive 查询。

hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
    hc.sql(queryList[i])
    sparkDF.write.saveAsTable('hiveTable', mode='append')
尽管此代码对于较小的 X 值很有效,但当 X>100 时会导致问题。每个 saveAsTable 作业之间的延迟呈指数增长,但每个作业或多或少需要大约恒定的 5 秒。

The things I tried to rectify this without any luck:

  1. Add a gc.collect() inside the for loop once (i%100==0). But this breaks the FOR loop
  2. Close the current Spark and Hive context once (i%100==0) and create a new ones - this still doesn't solve the problem
  3. Use yarn-cluster instead of yarn-client - no luck!

有没有类似的选项,每次我调用 saveAsTable 函数时,我都会创建一个到 hive 的连接并关闭它?还是清理驱动?

最佳答案

发生这种情况是因为您正在使用在 spark 驱动程序模式下执行的 for 循环而不是在集群工作节点上分发,这意味着它没有使用并行的力量或不在工作节点上执行。尝试使用带有分区的并行化创建 RDD,这将有助于在工作节点上生成作业
或者,如果您只想处理 hiveContext,您可以创建全局 HiveContext,如 Val hiveCtx = new HiveContext(sc) 并在循环内重用。
您还可以在集群上运行作业时更改/优化执行程序的数量以提高性能

关于apache-spark - 在循环内使用 sparkDF.write.saveAsTable() 会导致作业之间的延迟呈指数增长,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43442364/

相关文章:

apache-spark - Oozie Spark(2.x)操作始终处于接受状态

mysql - 无法删除配置单元表

sql - Hive:通过查找组中的最大值

python - 如何在 pyspark 操作中轻松使用自定义类方法?

pyspark - Spark 数据帧 CSV 与 Parquet

scala - 在 Spark DataFrame 中基于旧列添加新列

hadoop - 在spark-submit执行时覆盖core-site.xml属性值

scala - java.lang.RuntimeException:java.lang.String不是bigint或int模式的有效外部类型

hadoop - 如何配置配置单元服务器以远程模式运行?

python - 如何在 pyspark 中按字母顺序对嵌套结构的列进行排序?