scala - 如何在 Spark 中访问广播的 DataFrame

标签 scala apache-spark

我创建了两个数据框,它们来自 Hive 表(PC_ITM 和 ITEM_SELL)并且大小很大,我正在使用它们
经常在 SQL 查询中通过注册为表。但是由于这些很大,所以需要很多时间
获取查询结果。所以我将它们保存为 Parquet 文件,然后读取它们并注册为临时表。但我仍然没有获得良好的性能,所以我广播了这些数据帧,然后注册为如下表。

PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")

ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)


sqlContext.sql("JOIN Query").show

但是我仍然无法实现性能,它花费的时间与不广播这些数据帧的时间相同。

谁能判断这是否是广播和使用它的正确方法?`

最佳答案

您实际上并不需要“访问”广播数据帧——您只需使用它,Spark 将在幕后实现广播。 broadcast function 效果很好,而且比 sc.broadcast 更有意义方法。

如果您一次评估所有内容,可能很难理解时间都花在了什么地方。

您可以将代码分解为多个步骤。这里的关键是执行一个操作并在您在加入中使用它们之前保留您想要广播的数据帧。

// load your dataframe
PC_ITM_DF=sqlContext.parquetFile("path")

// mark this dataframe to be stored in memory once evaluated
PC_ITM_DF.persist()

// mark this dataframe to be broadcast
broadcast(PC_ITM_DF)

// perform an action to force the evaluation
PC_ITM_DF.count()

这样做将确保数据帧是
  • 加载到内存中(持久化)
  • 注册为临时表以用于 SQL 查询
  • 标记为广播,因此将发送给所有执行者

  • 当你现在运行 sqlContext.sql("JOIN Query").show您现在应该在 Spark UI 的 SQL 选项卡中看到“广播哈希联接”。

    关于scala - 如何在 Spark 中访问广播的 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34931272/

    相关文章:

    apache-spark - Spark中的分区数和并发任务数是如何计算的

    scala - 如何在 Scala 中将整数转换为字符串?

    Scala代码不获取s3文件

    scala - 在Scala中,如何将导入语句传递给子类?

    scala - 为什么scala不能推断Left[X,A]是Either[X,B]的合理子类型?

    apache-spark - 如何在将 CSV 读取到 Spark 中的数据帧时指定多个 TimestampType 和 DataType 格式?

    scala - 测试以确保两个函数同时运行?

    performance - 使用 Graphite+Grafana 测量服务的正常运行时间

    在python中按时加入两个 Spark 数据帧(TimestampType)

    scala - 使用 flatmap 将一行 spark 数据集分解为多行并添加列