apache-spark - 如何拆分一个巨大的rdd并轮流广播?

标签 apache-spark

描述:

我们的spark版本是1.4.1

我们想要连接两个巨大的 RDD,其中一个带有倾斜数据。所以spark rdd操作join可能会导致内存问题。我们尝试将较小的一个分割成多个片段,然后分批广播它们。在每个广播回合中,我们尝试将较小的rdd的一部分收集到驱动程序,然后将其保存到HashMap,然后广播HashMap。每个执行器使用广播值对较大的rdd进行map操作。我们通过这种方式实现倾斜数据连接。

但是当它每轮处理广播值时。我们发现处理后我们不能破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将 触发错误。像这样:

java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)

我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。 if rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1是使用名为b1的广播变量生成的,rdd2使用b2生成的。生成rdd3时,源代码显示需要序列化b1和b2。如果b1或b2在rdd3生产过程之前被破坏。它将导致我上面列出的失败。

问题:

是否存在一种方法可以让rdd3忘记其依赖关系,使其在生成过程中不需要b1、b2,只需要rdd2?

或者是否存在处理倾斜连接问题的方法?

顺便说一句,我们为每个回合设置了检查点。并将spark.cleaner.ttl设置为600。问题仍然存在。如果我们不销毁广播变量,执行器将在第 5 回合失败。

我们的代码是这样的:

for (int i = 0; i < times; i++) {
    JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
    List<Tuple2<String, Double>> itemSplit = itemZippedRdd
            .filter(new FilterByHashFunction(times, i))
            .collect();

    Map<String, Double> itemSplitMap = new HashMap<String, Double>();
    for (Tuple2<String, Double> item : itemSplit) {
        itemSplitMap.put(item._1(), item._2());
    }
    Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
            .broadcast(itemSplitMap);

    curItemPairRdd = prevItemPairRdd
            .mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
            .persist(StorageLevel.DISK_ONLY());
    curItemPairRdd.count();

    itemSplitBroadcast.destroy(true);
    itemSplit.clear();

}

最佳答案

我个人会尝试一些不同的方法。让我们从一个小的模拟数据集开始

import scala.util.Random
Random.setSeed(1)

val left = sc.parallelize(
  Seq.fill(200)(("a", Random.nextInt(100))) ++ 
  Seq.fill(150)(("b",  Random.nextInt(100))) ++ 
  Seq.fill(100)(Random.nextPrintableChar.toString, Random.nextInt(100))
)

并按键计数:

val keysDistribution = left.countByKey

进一步假设第二个 RDD 是均匀分布的:

val right = sc.parallelize(
  keysDistribution.keys.flatMap(x => (1 to 5).map((x, _))).toSeq)

并将每个键可以处理的值数量阈值设置为 10:

val threshold = 10
  1. 使用代理键来增加粒度。

    想法很简单。我们可以使用 ((k, i), v),而不是连接 (k, v) 对,其中 i 是一个整数,取决于给定 k 的阈值和元素数量。

    val buckets = keysDistribution.map{
      case (k, v) => (k -> (v / threshold + 1).toInt)
    }
    
    // Assign random i to each element in left
    val leftWithSurrogates = left.map{case (k, v) => {
      val i = Random.nextInt(buckets(k))
      ((k, i), v)
    }}
    
    // Replicate each value from right to i buckets
    val rightWithSurrogates = right.flatMap{case (k, v) => {
      (0 until buckets(k)).map(i => ((k, i), v))
    }}
    
    val resultViaSurrogates = leftWithSurrogates
      .join(rightWithSurrogates)
      .map{case ((k, _), v) => (k, v)}
    
  2. 分而治之 - 分割处理频繁键和不频繁键。

    首先让我们使用不常用的键加入:

    val infrequentLeft = left.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequentRight = right.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequent = infrequentLeft.join(infrequentRight)
    

    接下来让我们分别处理每个常用键:

    val frequentKeys = keysDistribution
      .filter{case (_, v) => v >= threshold}
      .keys
    
    val frequent = sc.union(frequentKeys.toSeq.map(k => {
      left.filter(_._1 == k)
        .cartesian(right.filter(_._1 == k))
        .map{case ((k, v1), (_, v2)) => (k, (v1, v2))}
    }))
    

    最后让两个子集合并:

    val resultViaUnion = infrequent.union(frequent)
    

快速健全性检查:

val resultViaJoin = left.join(right).sortBy(identity).collect.toList

require(resultViaUnion.sortBy(identity).collect.toList == resultViaJoin)
require(resultViaSurrogates.sortBy(identity).collect.toList == resultViaJoin)

显然,这只是一个草图,而不是一个完整的解决方案,但应该可以让您了解如何继续。与broadcast相比,它的最大优势在于它消除了驱动程序瓶颈。

Does it exist way can let rdd3 forget its dependency and make it don't require b1, b2, only required rdd2 during its producing process?

您使用检查点和强制计算,但如果丢失任何分区,它仍然会失败。

关于apache-spark - 如何拆分一个巨大的rdd并轮流广播?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34656338/

相关文章:

logging - 监控 Apache Spark 日志和动态应用程序/驱动程序日志

scala - 了解 UID 在 Spark MLLib Transformer 中的作用

hadoop - Spark的API newHadoopRDD到底做什么?

hadoop - 损坏的 Parquet 文件

java - Spark streaming mapWithState 超时延迟?

scala - 简单esRDD(Spark中使用的Elasticsearch-hadoop连接器)引发了异常

apache-spark - Spark SQL 和使用现有的配置单元 udfs

python - 将 PySpark DF 写入专用格式的文件

scala - 重载的方法值在scala中应用替代错误

scala - 定义接受 Spark DataFrame 中的对象数组的 UDF?