scala - 如何引用范围之外的 Spark 广播变量

标签 scala apache-spark

我看到的所有 Spark 广播变量示例都在使用它们的函数范围内定义它们( map()join() 等)。我想同时使用 map()功能和mapPartitions()引用广播变量的函数,但我想将它们模块化,以便可以将相同的函数用于单元测试。

  • 我怎样才能做到这一点?

  • 我的一个想法是对函数进行柯里化(Currying),以便在使用 map 时传递对广播变量的引用。或 mapPartitions称呼。
  • 在原始范围内定义函数时,传递对广播变量的引用是否会对性能产生影响?

  • 我有这样的想法(伪代码):
    // firstFile.scala
    // ---------------
    
    def mapper(bcast: Broadcast)(row: SomeRow): Int = {
      bcast.value(row._1)
    }
    
    def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
      val broadcastVariable = bcast.value
    
      for {
        i <- iter
      } yield broadcastVariable(i)
    })
    
    
    // secondFile.scala
    // ----------------
    
    import firstFile.{mapMyPartition, mapper}
    
    val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
    
    rdd
     .map(mapper(bcastVariable))
     .mapPartitions(mapMyPartition(bcastVariable))
    

    最佳答案

    您的解决方案应该可以正常工作。在这两种情况下,函数都传递给 map{Partitions}序列化时将包含对广播变量本身的引用,但不包含对其值的引用,并且仅调用 bcast.value在节点上计算时。

    需要避免的是

    def mapper(bcast: Broadcast): SomeRow => Int = {
      val value = bcast.value
      row => value(row._1)
    }
    

    关于scala - 如何引用范围之外的 Spark 广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36849204/

    相关文章:

    scala - Scala 中的 Comonad 示例

    scala - 在Scala中将元组转换为数组

    apache-spark - Spark ml 2.0 - 朴素贝叶斯 - 如何确定每个类别的阈值

    apache-spark - pyspark作业参数中的--archives,-files,py-files之间有什么区别

    apache-spark - Apache Spark 按 DF 分组,将值收集到列表中,然后按列表分组

    scala - 在 Scala 中,如何为 Java 中定义的类定义伴随对象?

    scala - 将函数的第一个参数旋转为第 n 个

    hadoop - 如何在 Hadoop yarn 上设置 apache shark?

    xml - 使用 Scala 替代 XSLT?

    database - 针对不断变化的大型数据集发出实时警报