scala - Spark : Faster way to join two dataframe?

标签 scala apache-spark

我有两个数据框df1ip2Countrydf1 包含 IP 地址,我正在尝试将 IP 地址映射到地理位置信息,例如经度纬度,其中是 ip2Country 中的列。

我将其作为 Spark 提交作业运行,但尽管 df1 只有不到 2500 行,但操作仍花费了很长时间。

我的代码:

val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint")
, "inner")
.select($"src_ip"
,$"country_name".alias("scountry")
,$"iso_3".alias("scode")
,$"longitude".alias("slong")
,$"latitude".alias("slat")
,$"dst_ip",$"dint",$"count")
.filter($"slong".isNotNull)

val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint")
, "inner")
.select($"src_ip",$"scountry"
,$"scode",$"slong"
,$"slat",$"dst_ip"
,$"country_name".alias("dcountry")
,$"iso_3".alias("dcode")
,$"longitude".alias("dlong")
,$"latitude".alias("dlat"),$"count")
.filter($"dlong".isNotNull)

还有其他方法可以连接两个表吗?还是我的方法不对?

最佳答案

如果您有一个大数据帧需要与一个小数据帧连接 - 广播连接非常有效。阅读此处:Broadcast Joins (aka Map-Side Joins)

bigdf.join(broadcast(smalldf))

关于scala - Spark : Faster way to join two dataframe?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45975620/

相关文章:

scala - Scala map 中有 findall 吗?

scala - 为什么 Spark 应用程序以 “ClassNotFoundException: Failed to find data source: kafka” 作为带有 sbt 程序集的 uber-jar 失败?

apache-spark - 分析异常 : u'Cannot resolve column name

apache-spark - Spark 是否在内部跨节点分发数据帧?

apache-spark - 如何在一列上聚合并在 pyspark 中最大限度地利用其他列?

windows - 为什么 spark-shell 失败并显示 "' ""C:\Program' 在 Windows 上不被识别为内部或外部命令?

scala - -Ywarn-unused-import 在 Play 路线文件上触发

Scala 如何使用 Map 将方法存储为值

Scala 类型参数被推断为元组

scala - 使用自定义 Hadoop 输入格式在 Spark 中处理二进制文件