我是 Spark 新手,正在尝试找到一种方法将信息从一个 rdd 集成到另一个 rdd 中,但它们的结构不适合标准连接函数
我有这种格式的rdd:
[{a:a1, b:b1, c:[1,2,3,4], d:d1},
{a:a2, b:b2, c:[5,6,7,8], d:d2}]
以及另一种这种格式:
[{1:x1},{2,x2},{3,x3},{4,x4},{5,x5},{6,x6},{7,x7},{8,x8}]
我想将第二个 rdd 中的值与第一个 rdd 中的键(位于 c 键的列表值中)进行匹配。我知道如何操纵它们,所以我不太关心最终的输出,但我可能希望看到这样的东西:
[{a:a1, b:b1, c:[1,2,3,4],c0: [x1,x2,x3,x4], d:d1},
{a:a2, b:b2, c:[5,6,7,8],c0: [x5,x6,x7,x8], d:d2}]
或者这个:
[{a:a1, b:b1, c:[(1,x1),(2,x2),(3,x3),(4,x4)], d:d1},
{a:a2, b:b2, c:[(5,x5),(6,x6),(7,x7),(8,x8)], d:d2}]
或任何其他可以将第二个 rdd 中的键与第一个 rdd 中的值匹配的内容。我考虑过将第二个 rdd 制作成字典,我知道如何使用它,但我只是认为我的数据太大了。
非常感谢,我真的很感激。
最佳答案
join
在 flatMap
之后,或者 cartesian
进行了太多的洗牌。
可能的解决方案之一是在 groupBy
之后使用 cartesian
和 HashPartitioner
。
(抱歉,这是 scala
代码)
val rdd0: RDD[(String, String, Seq[Int], String)]
val rdd1: RDD[(Int, String)]
val partitioner = new HashPartitioner(rdd0.partitions.size)
// here is the point!
val grouped = rdd1.groupBy(partitioner.getPartition(_))
val result = rdd0.cartesian(grouped).map { case (left, (_, right)) =>
val map = right.toMap
(left._1, left._2, left._4) -> left._3.flatMap(v => map.get(v).map(v -> _))
}.groupByKey().map { case (key, value) =>
(key._1, key._2, value.flatten.toSeq, key._3)
}
关于join - Spark 中的复杂连接 : rdd elements have many key-value pairs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30260014/