描述:
我们的spark版本是1.4.1
我们想要连接两个巨大的 RDD,其中一个带有倾斜数据。所以spark rdd操作join可能会导致内存问题。我们尝试将较小的一个分割成多个片段,然后分批广播它们。在每个广播回合中,我们尝试将较小的rdd的一部分收集到驱动程序,然后将其保存到HashMap,然后广播HashMap。每个执行器使用广播值对较大的rdd进行map操作。我们通过这种方式实现倾斜数据连接。
但是当它每轮处理广播值时。我们发现处理后我们不能破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将 触发错误。像这样:
java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)
我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。 if rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1是使用名为b1的广播变量生成的,rdd2使用b2生成的。生成rdd3时,源代码显示需要序列化b1和b2。如果b1或b2在rdd3生产过程之前被破坏。它将导致我上面列出的失败。
问题:
是否存在一种方法可以让rdd3忘记其依赖关系,使其在生成过程中不需要b1、b2,只需要rdd2?
或者是否存在处理倾斜连接问题的方法?
顺便说一句,我们为每个回合设置了检查点。并将spark.cleaner.ttl设置为600。问题仍然存在。如果我们不销毁广播变量,执行器将在第 5 回合失败。
我们的代码是这样的:
for (int i = 0; i < times; i++) {
JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
List<Tuple2<String, Double>> itemSplit = itemZippedRdd
.filter(new FilterByHashFunction(times, i))
.collect();
Map<String, Double> itemSplitMap = new HashMap<String, Double>();
for (Tuple2<String, Double> item : itemSplit) {
itemSplitMap.put(item._1(), item._2());
}
Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
.broadcast(itemSplitMap);
curItemPairRdd = prevItemPairRdd
.mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
.persist(StorageLevel.DISK_ONLY());
curItemPairRdd.count();
itemSplitBroadcast.destroy(true);
itemSplit.clear();
}
最佳答案
我个人会尝试一些不同的方法。让我们从一个小的模拟数据集开始
import scala.util.Random
Random.setSeed(1)
val left = sc.parallelize(
Seq.fill(200)(("a", Random.nextInt(100))) ++
Seq.fill(150)(("b", Random.nextInt(100))) ++
Seq.fill(100)(Random.nextPrintableChar.toString, Random.nextInt(100))
)
并按键计数:
val keysDistribution = left.countByKey
进一步假设第二个 RDD 是均匀分布的:
val right = sc.parallelize(
keysDistribution.keys.flatMap(x => (1 to 5).map((x, _))).toSeq)
并将每个键可以处理的值数量阈值设置为 10:
val threshold = 10
使用代理键来增加粒度。
想法很简单。我们可以使用
((k, i), v)
,而不是连接(k, v)
对,其中i
是一个整数,取决于给定k
的阈值和元素数量。val buckets = keysDistribution.map{ case (k, v) => (k -> (v / threshold + 1).toInt) } // Assign random i to each element in left val leftWithSurrogates = left.map{case (k, v) => { val i = Random.nextInt(buckets(k)) ((k, i), v) }} // Replicate each value from right to i buckets val rightWithSurrogates = right.flatMap{case (k, v) => { (0 until buckets(k)).map(i => ((k, i), v)) }} val resultViaSurrogates = leftWithSurrogates .join(rightWithSurrogates) .map{case ((k, _), v) => (k, v)}
分而治之 - 分割处理频繁键和不频繁键。
首先让我们使用不常用的键加入:
val infrequentLeft = left.filter{ case (k, _) => keysDistribution(k) < threshold } val infrequentRight = right.filter{ case (k, _) => keysDistribution(k) < threshold } val infrequent = infrequentLeft.join(infrequentRight)
接下来让我们分别处理每个常用键:
val frequentKeys = keysDistribution .filter{case (_, v) => v >= threshold} .keys val frequent = sc.union(frequentKeys.toSeq.map(k => { left.filter(_._1 == k) .cartesian(right.filter(_._1 == k)) .map{case ((k, v1), (_, v2)) => (k, (v1, v2))} }))
最后让两个子集合并:
val resultViaUnion = infrequent.union(frequent)
快速健全性检查:
val resultViaJoin = left.join(right).sortBy(identity).collect.toList
require(resultViaUnion.sortBy(identity).collect.toList == resultViaJoin)
require(resultViaSurrogates.sortBy(identity).collect.toList == resultViaJoin)
显然,这只是一个草图,而不是一个完整的解决方案,但应该可以让您了解如何继续。与broadcast
相比,它的最大优势在于它消除了驱动程序瓶颈。
Does it exist way can let rdd3 forget its dependency and make it don't require b1, b2, only required rdd2 during its producing process?
您使用检查点和强制计算,但如果丢失任何分区,它仍然会失败。
关于apache-spark - 如何拆分一个巨大的rdd并轮流广播?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34656338/