scala - 在 spark 中加入两个 RDD

标签 scala apache-spark

我有两个 rdd,一个 rdd 只有一列,另外有两列连接键上的两个 RDD 我添加了 0 的虚拟值,还有其他有效的方法可以使用 join 吗?

val lines = sc.textFile("ml-100k/u.data")
val movienamesfile = sc.textFile("Cml-100k/u.item")

val moviesid = lines.map(x => x.split("\t")).map(x => (x(1),0))
val test = moviesid.map(x => x._1)
val movienames = movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1)))
val shit = movienames.join(moviesid).distinct()

编辑:

让我把这个问题转换成 SQL。比如说我有 table1 (moveid)table2 (movieid,moviename)。在 SQL 中,我们编写如下内容:

select moviename, movieid, count(1)
from table2 inner join table table1 on table1.movieid=table2.moveid 
group by ....

在 SQL 中 table1 只有一列,而 table2 有两列,join 仍然有效,Spark 中的加入方式相同来自两个 RDD 的 key 。

最佳答案

Join 操作仅在 PairwiseRDDs 上定义,这与 SQL 中的关系/表完全不同。 PairwiseRDD 的每个元素都是一个 Tuple2,其中第一个元素是 key,第二个元素是 value。两者都可以包含复杂对象,只要 key 提供有意义的 hashCode

如果您想在 SQL 中考虑这一点,您可以将 key 视为进入 ON 子句的所有内容,并且 value 包含选定的列。

SELECT table1.value, table2.value
FROM table1 JOIN table2 ON table1.key = table2.key

虽然这些方法乍一看很相似,但您可以使用另一种方法来表达一种方法,但它们有一个根本区别。当您查看 SQL 表并忽略约束时,所有列都属于同一类对象,而 PairwiseRDD 中的 keyvalue 具有明确的意思。

回到你的问题来使用 join 你需要 keyvalue。可以说比使用 0 作为占位符更干净的是使用 null 单例,但实际上没有办法解决它。

对于小数据,您可以使用过滤器以类似的方式广播加入:

val moviesidBD = sc.broadcast(
  lines.map(x => x.split("\t")).map(_.head).collect.toSet)

movienames.filter{case (id, _) => moviesidBD.value contains id}

但如果你真的想要 SQL 式连接,那么你应该简单地使用 SparkSQL。

val movieIdsDf = lines
   .map(x => x.split("\t"))
   .map(a => Tuple1(a.head))
   .toDF("id")

val movienamesDf = movienames.toDF("id", "name")

// Add optional join type qualifier 
movienamesDf.join(movieIdsDf, movieIdsDf("id") <=> movienamesDf("id"))

关于scala - 在 spark 中加入两个 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33321704/

相关文章:

scala - 如何使用 Scala Breeze 对向量执行逐元素标量运算?

scala - 在 Scala 的文件中有效地存储 Long 序列

scala - 为什么这个 LR 代码在 spark 上运行太慢?

scala - 将 Spark 数据帧保存为 Google Cloud Storage 中的 parquet 文件

scala - 将新数据 append 到分区 parquet 文件

scala - 迭代列表,返回当前元素、下一个元素以及当前元素之前的元素

scala - 不理解 Scala 分隔延续的类型 (A @cpsParam[B,C])

apache-spark - 如何在 spark Dataframe 中像 SQL 一样实现 EXISTS 条件

python - 用 DataFrame 中的 None/null 值替换空字符串

apache-spark - Zeppelin 0.8.2 - localRepoPath 应该有一个值