scala - 有没有办法优化spark sql代码?

标签 scala hadoop apache-spark apache-spark-sql

更新:

我正在使用 Spark sql 1.5.2。尝试读取许多 parquet 文件并过滤和聚合行 - 我的 hdfs 中的约 30 个文件中存储了约 35M 的行,处理时间超过 10 分钟

val logins_12 = sqlContext.read.parquet("events/2015/12/*/login")
val l_12 = logins_12.where("event_data.level >= 90").select(
    "pid", 
    "timestamp", 
    "event_data.level" 
    ).withColumn("event_date", to_date(logins_12("timestamp"))).drop("timestamp").toDF("pid",  "level", "event_date").groupBy("pid", "event_date").agg(Map("level"->"max")).toDF("pid", "event_date", "level")
l_12.first()   

我的 Spark 在两个节点集群中运行,每个集群有 8 个核心和 16Gb 内存,scala 输出让我认为计算仅在一个线程中运行:

scala> x.first()
[Stage 1:=======>                                               (50 + 1) / 368]

当我尝试使用 count() 而不是 first() 时,看起来有两个线程正在执行计算。这仍然比我预期的要少,因为可以并行处理大约 30 个文件

scala> l_12.count()   
[Stage 4:=====>                                                  (34 + 2) / 368]

我在 yarn 客户端模式下启动 Spark 控制台,其中 14g 用于执行程序,4g 用于驱动程序

./bin/spark-shell -Dspark.executor.memory=14g -Dspark.driver.memory=4g --master yarn-client

我的 Spark 默认配置:

spark.executor.memory              2g
spark.logConf                      true
spark.eventLog.dir                 maprfs:///apps/spark
spark.eventLog.enabled             true
spark.sql.hive.metastore.sharedPrefixes  com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni
spark.executor.extraClassPath
spark.yarn.historyServer.address  http://test-01:18080

rdd有200个分区

scala> logins_12.rdd.partitions.size
res2: Int = 368
scala> l_12.rdd.partitions.size
res0: Int = 200

有没有办法优化这段代码? 谢谢

最佳答案

这两种行为或多或少都是预期的。 Spark 相当懒惰,除非您触发操作,否则它不仅不会执行转换,而且如果不需要输出,它还可以跳过任务。由于 first 仅需要一个元素,因此它只能计算一个分区。这很可能是您在某个时刻只看到一个正在运行的线程的原因。

关于第二个问题,很可能是配置问题。假设 YARN 配置没有任何问题(我不使用 YARN,但 yarn.nodemanager.resource.cpu-vcores 看起来可能是问题的根源),这很可能是 Spark 默认值的问题。正如您可以在 Configuration guide 中读到的那样Yarn 上的 spark.executor.cores 默认设置为 1。两个工作线程提供两个正在运行的线程。

关于scala - 有没有办法优化spark sql代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35174621/

相关文章:

scala - 如何从 Scala 集合中的索引获取选项?

scala - 从 Spark 服务器执行 SFTP 时,大型机服务器上的记录级别数据截断

apache-spark - 将多个原始文件合并为单个 Parquet 文件

scala - 映射 Set 时避免意外删除重复项

java - 在 Reducer 中查找最常见的键,错误 : java. lang.ArrayIndexOutOfBoundsException:1

hadoop - 了解 hadoop 中默认调度程序的技术

hadoop - java.lang.NoSuchMethodError : org. apache.hadoop.fs.FsServerDefaults.<init>(JIISIZJLorg/apache/hadoop/util/DataChecksum$Type;)V

python - Windows 10 上的 Spark。 'Files\Spark\bin\..\jars"“\”未被识别为内部或外部命令

elasticsearch - Spark机器学习和Elasticsearch在Python中分析了 token /文本

scala - 如何使用 Selenium WebDriverWait 获得更有意义的失败消息/断言?