join - Spark 中的复杂连接 : rdd elements have many key-value pairs

标签 join rdd pyspark

我是 Spark 新手,正在尝试找到一种方法将信息从一个 rdd 集成到另一个 rdd 中,但它们的结构不适合标准连接函数

我有这种格式的rdd:

[{a:a1, b:b1, c:[1,2,3,4], d:d1},
 {a:a2, b:b2, c:[5,6,7,8], d:d2}]

以及另一种这种格式:

[{1:x1},{2,x2},{3,x3},{4,x4},{5,x5},{6,x6},{7,x7},{8,x8}]

我想将第二个 rdd 中的值与第一个 rdd 中的键(位于 c 键的列表值中)进行匹配。我知道如何操纵它们,所以我不太关心最终的输出,但我可能希望看到这样的东西:

[{a:a1, b:b1, c:[1,2,3,4],c0: [x1,x2,x3,x4], d:d1},
 {a:a2, b:b2, c:[5,6,7,8],c0: [x5,x6,x7,x8], d:d2}]

或者这个:

[{a:a1, b:b1, c:[(1,x1),(2,x2),(3,x3),(4,x4)], d:d1},
 {a:a2, b:b2, c:[(5,x5),(6,x6),(7,x7),(8,x8)], d:d2}]

或任何其他可以将第二个 rdd 中的键与第一个 rdd 中的值匹配的内容。我考虑过将第二个 rdd 制作成字典,我知道如何使用它,但我只是认为我的数据太大了。

非常感谢,我真的很感激。

最佳答案

joinflatMap 之后,或者 cartesian 进行了太多的洗牌。

可能的解决方案之一是在 groupBy 之后使用 cartesianHashPartitioner

(抱歉,这是 scala 代码)

val rdd0: RDD[(String, String, Seq[Int], String)]
val rdd1: RDD[(Int, String)]

val partitioner = new HashPartitioner(rdd0.partitions.size)

// here is the point!
val grouped = rdd1.groupBy(partitioner.getPartition(_))

val result = rdd0.cartesian(grouped).map { case (left, (_, right)) =>
    val map = right.toMap
    (left._1, left._2, left._4) -> left._3.flatMap(v => map.get(v).map(v -> _))
}.groupByKey().map { case (key, value) =>
    (key._1, key._2, value.flatten.toSeq, key._3)
}

关于join - Spark 中的复杂连接 : rdd elements have many key-value pairs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30260014/

相关文章:

Mysql 用一个 select 短语连接多个表

apache-spark - 使用 pySpark 对 RDD 中数组类型的值进行排序

scala - Spark Dataset聚合类似于RDD aggregate(zero)(accum, combiner)

java - 未找到与带有 Base 的可序列化的 Product 对应的 Java 类

apache-spark - Databricks:如何将 %python 下的 Spark 数据帧转换为 %r 下的数据帧

mysql - 不用 GroupConcat 将多行合并为一

mysql - 使用不同的列名连接 MySQL 中的两个表

MySQL 对同一个表的重复连接

python - PySpark - 稀疏向量列到矩阵

apache-spark - reduceByKey 和 lambda