performance - spark中熵的高效计算

标签 performance scala apache-spark entropy information-theory

给定一个 RDD(数据)和一个用于计算熵的索引字段列表。执行以下流程时,在 2MB(16k 行)源上计算单个熵值大约需要 5 秒。

def entropy(data: RDD[Array[String]], colIdx: Array[Int], count: Long): Double = { 
  println(data.toDebugString)
    data.map(r => colIdx.map(idx => r(idx)).mkString(",") -> 1)
        .reduceByKey(_ + _)
        .map(v => {
        val p = v._2.toDouble / count
        -p * scala.math.log(p) / scala.math.log(2)
      })
        .reduce((v1, v2) => v1 + v2)
}

debugString 的输出如下:
(entropy,MappedRDD[93] at map at Q.scala:31 (8 partitions)
  UnionRDD[72] at $plus$plus at S.scala:136 (8 partitions)
    MappedRDD[60] at map at S.scala:151 (4 partitions)
      FilteredRDD[59] at filter at S.scala:150 (4 partitions)
        MappedRDD[40] at map at S.scala:124 (4 partitions)
          MapPartitionsRDD[39] at mapPartitionsWithIndex at L.scala:356 (4 partitions)
            FilteredRDD[27] at filter at S.scala:104 (4 partitions)
              MappedRDD[8] at map at X.scala:21 (4 partitions)
                MappedRDD[6] at map at R.scala:39 (4 partitions)
                  FlatMappedRDD[5] at objectFile at F.scala:51 (4 partitions)
                    HadoopRDD[4] at objectFile at F.scala:51 (4 partitions)
    MappedRDD[68] at map at S.scala:151 (4 partitions)
      FilteredRDD[67] at filter at S.scala:150 (4 partitions)
        MappedRDD[52] at map at S.scala:124 (4 partitions)
          MapPartitionsRDD[51] at mapPartitionsWithIndex at L.scala:356 (4 partitions)
            FilteredRDD[28] at filter at S.scala:105 (4 partitions)
              MappedRDD[8] at map at X.scala:21 (4 partitions)
                MappedRDD[6] at map at R.scala:39 (4 partitions)
                  FlatMappedRDD[5] at objectFile at F.scala:51 (4 partitions)
                    HadoopRDD[4] at objectFile at F.scala:51 (4 partitions),colIdex,13,count,3922)

如果我收集 RDD 并行化 再次需要大约 150 毫秒的时间来计算(对于一个简单的 2MB 文件来说,这似乎仍然很高) - 并且在处理多个 GB 数据时显然会带来挑战。我缺少什么才能正确使用 Spark 和 Scala?

我最初的实现(表现更糟):
data.map(r => colIdx
  .map(idx => r(idx)).mkString(","))
  .groupBy(r => r)
  .map(g => g._2.size)
  .map(v => v.toDouble / count)
  .map(v => -v * scala.math.log(v) / scala.math.log(2))
  .reduce((v1, v2) => v1 + v2)

最佳答案

首先看起来您的代码中存在错误,您需要处理 p0所以-p * math.log(p) / math.log(2)应该是 if (p == 0.0) 0.0 else -p * math.log(p) / math.log(2) .

其次,您可以使用基数 e,您实际上并不需要基数为 2。

无论如何,您的代码缓慢的原因可能是由于分区太少。每个 CPU 应该至少有 2-4 个分区,实际上我经常使用更多。你有多少个 CPU?

现在可能花费最长时间的不是熵计算,因为它非常微不足道——而是 reduceByKey这是在 String 上完成的键。是否可以使用其他一些数据类型? colIdx 究竟是什么? r究竟是什么?

最后一个观察结果是您使用此 colIdx.map(r.apply) 多次索引每条记录...你知道如果 r 这会很慢不是类型 ArrayIndexedSeq ... 如果是 List它将是 O(index),因为您必须遍历列表以获取所需的索引。

关于performance - spark中熵的高效计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24837928/

相关文章:

apache-spark - Spark 重新分区落入单个分区

scala - 从终端在 Spark scala 中添加外部库

scala - Spark缓存的RDD计算n次

scala - scala 中 null 的相等性,odersky 书的解释似乎与 scala 代码不同?

scala - 柯里化(Currying)应用于特质伴生对象

scala - 使 ScalaCheck 测试具有确定性

c# - 取消嵌套列表迭代以提高性能

java - Java集合栈数据结构的效率

按性能条件分组的 MySql 查询

scala - 使用谓词下推连接两个数据集