scala - Spark : Efficient mass lookup in pair RDD's

标签 scala apache-spark

在 Apache Spark 中,我有两个 RDD。第一data : RDD[(K,V)]包含键值形式的数据。第二个pairs : RDD[(K,K)]包含该数据的一组有趣的 key 对。

如何高效地构建 RDD pairsWithData : RDD[((K,K)),(V,V))] ,这样它就包含了来自 pairs 的所有元素作为键元组及其相应的值(来自 data )作为值元组?

数据的一些属性:

  • data 中的 key 独一无二
  • pairs 中的所有条目独一无二
  • 对于所有对 (k1,k2)pairs保证k1 <= k2
  • 'pairs' 的大小只是数据大小的常数 |pairs| = O(|data|)
  • 当前数据大小(预计会增长):|data| ~ 10^8, |pairs| ~ 10^10

  • 当前尝试

    以下是 Scala 中的一些示例代码:
    import org.apache.spark.rdd.RDD
    import org.apache.spark.SparkContext._
    
    // This kind of show the idea, but fails at runtime.
    def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
      keyPairs map {case (k1,k2) =>
        val v1 : String = data lookup k1 head;
        val v2 : String = data lookup k2 head;
        ((k1, k2), (v1,v2))
      }
    }
    
    // Works but is O(|data|^2)
    def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
      // Construct all possible pairs of values
      val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))}
      // Select only the values who's keys are in keyPairs
      keyPairs map {(_,0)} join cartesianData mapValues {_._2}
    }
    
    // Example function that find pairs of keys
    // Runs in O(|data|) in real life, but cannot maintain the values
    def relevantPairs(data : RDD[(Int, String)]) = {
      val keys = data map (_._1)
      keys cartesian keys filter {case (x,y) => x*y == 12 && x < y}
    }
    
    // Example run
    val data = sc parallelize(1 to 12) map (x => (x, "Number " + x))
    val pairs = relevantPairs(data)
    val pairsWithData = massPairLookup2(pairs, data) 
    
    
    // Print: 
    // ((1,12),(Number1,Number12))
    // ((2,6),(Number2,Number6))
    // ((3,4),(Number3,Number4))
    pairsWithData.foreach(println)
    

    尝试 1

    首先我尝试只使用 lookup功能在 data ,但在执行时会引发运行时错误。好像selfPairRDDFunctions 中为空特征。

    另外我不确定lookup的性能. The documentation说如果 RDD 有一个已知的分区器,那么这个操作就可以有效地完成,只需搜索键映射到的分区。这听起来像 n查找最多需要 O(n*|partition|) 时间,我怀疑可以优化。

    尝试 2

    此尝试有效,但我创建了 |data|^2对会降低性能。我不希望 Spark 能够优化它。

    最佳答案

    您的查找 1 不起作用,因为您无法在工作人员内部(在另一个转换中)执行 RDD 转换。

    在查找 2 中,我认为没有必要执行完整的笛卡尔...

    你可以这样做:

    val firstjoin = pairs.map({case (k1,k2) => (k1, (k1,k2))})
        .join(data)
        .map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)})
    val result = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))})
        .join(data)
        .map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))})
    

    或者以更密集的形式:
        val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2)
        val result = firstjoin.map({case (x,y) => (x._2, (x,y))})
            .join(data).map({case(x, (y, z))=>(y._1, (y._2, z))})
    

    我不认为你可以更有效地做到这一点,但我可能是错的......

    关于scala - Spark : Efficient mass lookup in pair RDD's,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27983215/

    相关文章:

    math - ScalaNumber 的实现如何在底层工作?

    scala - 获取 TrainValidationSplit scala 的最佳参数

    apache-spark - 为什么无法在minikube/kubernetes上实例化运行外部Spark的外部调度程序?

    scala - shapeless 将 case 类转换为 HList 并跳过所有选项字段

    scala - Scala Spark 属性的最佳实践

    scala - Scala 中的 foldLeft 如何在 DataFrame 上工作?

    python - PySpark 中的 mkString 等价物是什么?

    python - 将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame

    apache-spark - 使用 Spark SQL 查询 Hive 分区中子目录中的数据

    java - 为什么 Java 的 DateTimeFormatter 不能安全地解析/格式化 Instant 上的往返?