scala - Spark : Broadcast usage on local mode

标签 scala apache-spark apache-spark-sql spark-dataframe

我知道广播允许在每台机器上缓存一个只读副本,而不是将它的副本与任务一起发送。 但是,我想知道广播在本地模式下使用时是否有任何巨大影响,因为我没有节点集群。 或者在本地模式下不广播就可以使用吗?我只是想了解它的用法。

Spark 版本 #2.0,Scala 版本 #2.10 本地模式 - 8 核 CPU 64GB RAM

我有如下内容:

case class EmpDim(name: String,age: Int)

empDF
+-----+-------+------+
|EmpId|EmpName|EmpAge|
+-----+-------+------+
|    1|   John|    32|
|    2|  David|    45|
+-----+-------+------+

deptDF
+------+--------+-----+
|DeptID|DeptName|EmpID|
+------+--------+-----+
|     1|   Admin|    1|
|     2|      HR|    2|
|     3| Finance|    4|
+------+--------+-----+

val empRDD = empDF.rdd.map(x => (x.getInt(0), EmpDim(x.getString(1), x.getInt(2))))

val lookupMap = empRDD.collectAsMap() //Without Broadcast
val broadCastLookupMap: Broadcast[Map[Int,EmpDim]] = sc.broadcast(empRDD.collectAsMap()) //With Broadcast

def lookup(lookupMap:Map[Int,EmpDim]) = udf[Option[EmpDim],Int]((empID:Int) => lookupMap.lift(empID))

val combinedDF = deptDF.withColumn("lookupEmp",lookup(lookupMap)($"EmpID")) //Without Broadcast
                       .withColumn("broadCastLookupEmp",lookup(broadCastLookupMap.value)($"EmpID")) //With Broadcast
                       .withColumn("EmpName",coalesce($"lookupEmp.name",lit("Unknown - No Name to Lookup")))
                       .withColumn("EmpAge",coalesce($"lookupEmp.age",lit("Unknown - No Age to Lookup")))
                       .drop("lookupEmp")
                       .drop("broadCastLookupEmp")

+------+--------+-----+---------------------------+--------------------------+
|DeptID|DeptName|EmpID|EmpName                    |EmpAge                    |
+------+--------+-----+---------------------------+--------------------------+
|1     |Admin   |1    |John                       |32                        |
|2     |HR      |2    |David                      |45                        |
|3     |Finance |4    |Unknown - No Name to Lookup|Unknown - No Age to Lookup|
+------+--------+-----+---------------------------+--------------------------+

在上述情况下,是否建议使用广播,还是有点矫枉过正?请指教

最佳答案

像这样使用时,广播根本没有任何值(value)。

当你打电话时:

lookup(broadCastLookupMap.value)($"EmpID")

broadCastLookupMap.value 将根据 Scala 替换模型在本地进行评估。

正确的实现应该是:

def lookup(lookupMap: Broadcast[Map[Int, EmpDim]]) = udf[Option[EmpDim],Int](
  (empID:Int) => lookupMap.value.lift(empID)
)

并调用:

lookup(broadCastLookupMap)($"EmpID")

这可能会产生一些积极的影响,具体取决于实际的执行计划。本地或非本地模式 - 适用相同的规则

  • 如果数据在阶段之间重复使用(显式或隐式),广播会很有用。
  • 如果数据在管道中只使用一次,标准的闭包/参数处理机制就足够了。

这里没有任何迹象表明第一种情况,因此广播应该已过时,但如果您想确定,请使用实时环境测试这两种解决方案并比较结果。

按名字调用也应该有效:

def lookup(lookupMap: => Map[Int,EmpDim]) = udf[Option[EmpDim],Int](
  (empID:Int) => lookupMap.lift(empID)
)

关于scala - Spark : Broadcast usage on local mode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49904770/

相关文章:

scala - 如何使用 reduceByKey 将值添加到 Scala Spark 中的 Set 中?

java - 转换 Spark 数据集中的数据时数据类型不匹配

scala - 将 parquet 文件加载到 Spark 中的案例类中的性能

java - 在 Spark UDF JAVA 中传递额外变量

scala - Option [T]来自Scala的什么地方?

scala - 你能用 Scala 进行逻辑编程吗?

xml - 如何更改 Scala XML 元素的属性

python - pyspark countApprox() 似乎与 count() 没有区别

scala - 在执行 pivot spark 之前分组并查找计数

apache-spark - 如何在不指定所有列名的情况下将多个列值更改为常量?