scala - 与普通方法相比,通过最小散列计算的 Spark Jaccard 相似度计算速度较慢

标签 scala apache-spark apache-spark-mllib

给定 2 个巨大的值列表,我正在尝试计算 jaccard similarity它们之间使用 Scala 在 Spark 中。

假设 colHashed1包含第一个值列表和 colHashed2包含第二个列表。

方法一(普通方法):

val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble)

方法2(使用minHashing):

我已经使用了解释的方法 here .
import java.util.zip.CRC32

def getCRC32 (s : String) : Int =
{
    val crc=new CRC32
    crc.update(s.getBytes)
    return crc.getValue.toInt & 0xffffffff
}

val maxShingleID = Math.pow(2,32)-1
def pickRandomCoeffs(kIn : Int) : Array[Int] =
{
  var k = kIn
  val randList = Array.fill(k){0}

  while(k > 0)
  {
    // Get a random shingle ID.

    var randIndex = (Math.random()*maxShingleID).toInt

    // Ensure that each random number is unique.
    while(randList.contains(randIndex))
    {
      randIndex = (Math.random()*maxShingleID).toInt
    }

    // Add the random number to the list.
    k = k - 1
    randList(k) = randIndex
   } 

   return randList
}

val colHashed1 = list1Values.map(a => getCRC32(a))
val colHashed2 = list2Values.map(a => getCRC32(a))

val nextPrime = 4294967311L
val numHashes = 10

val coeffA = pickRandomCoeffs(numHashes)
val coeffB = pickRandomCoeffs(numHashes)

var signature1 = Array.fill(numHashes){0}
for (i <- 0 to numHashes-1)
{
    // Evaluate the hash function.
    val hashCodeRDD = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime))

    // Track the lowest hash code seen.
    signature1(i) = hashCodeRDD.min.toInt
}

var signature2 = Array.fill(numHashes){0}
for (i <- 0 to numHashes-1)
{
    // Evaluate the hash function.
    val hashCodeRDD = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime))

    // Track the lowest hash code seen.
    signature2(i) = hashCodeRDD.min.toInt
}


var count = 0
// Count the number of positions in the minhash signature which are equal.
for(k <- 0 to numHashes-1)
{
  if(signature1(k) == signature2(k))
    count = count + 1
}  
val jSimilarity = count/numHashes.toDouble

就时间而言,方法 1 似乎总是优于方法 2。当我分析代码时,min() RDD 上的函数调用在方法 2 中需要花费大量时间,并且根据使用的散列函数的数量,该函数被多次调用。

与重复的 min() 函数调用相比,方法 1 中使用的交集和并集操作似乎工作得更快。

我不明白为什么 minHashing 在这里没有帮助。与普通方法相比,我希望 minHashing 工作得更快。我在这里做错了什么吗?

样本数据可查看here

最佳答案

JaccardSimilarity 与 MinHash 没有给出一致的结果:

import java.util.zip.CRC32

object Jaccard {
  def getCRC32(s: String): Int = {
    val crc = new CRC32
    crc.update(s.getBytes)
    return crc.getValue.toInt & 0xffffffff
  }

  def pickRandomCoeffs(kIn: Int, maxShingleID: Double): Array[Int] = {
    var k = kIn
    val randList = Array.ofDim[Int](k)

    while (k > 0) {
      // Get a random shingle ID.
      var randIndex = (Math.random() * maxShingleID).toInt
      // Ensure that each random number is unique.
      while (randList.contains(randIndex)) {
        randIndex = (Math.random() * maxShingleID).toInt
      }
      // Add the random number to the list.
      k = k - 1
      randList(k) = randIndex
    }
    return randList
  }


  def approach2(list1Values: List[String], list2Values: List[String]) = {

    val maxShingleID = Math.pow(2, 32) - 1

    val colHashed1 = list1Values.map(a => getCRC32(a))
    val colHashed2 = list2Values.map(a => getCRC32(a))

    val nextPrime = 4294967311L
    val numHashes = 10

    val coeffA = pickRandomCoeffs(numHashes, maxShingleID)
    val coeffB = pickRandomCoeffs(numHashes, maxShingleID)

    val signature1 = for (i <- 0 until numHashes) yield {
      val hashCodeRDD = colHashed1.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime)
      hashCodeRDD.min.toInt // Track the lowest hash code seen.
    }

    val signature2 = for (i <- 0 until numHashes) yield {
      val hashCodeRDD = colHashed2.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime)
      hashCodeRDD.min.toInt // Track the lowest hash code seen
    }

    val count = (0 until numHashes)
      .map(k => if (signature1(k) == signature2(k)) 1 else 0)
      .fold(0)(_ + _)


    val jSimilarity = count / numHashes.toDouble
    jSimilarity
  }


  //  def approach1(list1Values: List[String], list2Values: List[String]) = {
  //    val colHashed1 = list1Values.toSet
  //    val colHashed2 = list2Values.toSet
  //
  //    val jSimilarity = colHashed1.intersection(colHashed2).distinct.count / (colHashed1.union(colHashed2).distinct.count.toDouble)
  //    jSimilarity
  //  }


  def approach1(list1Values: List[String], list2Values: List[String]) = {
    val colHashed1 = list1Values.toSet
    val colHashed2 = list2Values.toSet

    val jSimilarity = (colHashed1 & colHashed2).size / (colHashed1 ++ colHashed2).size.toDouble
    jSimilarity
  }

  def main(args: Array[String]) {

    val list1Values = List("a", "b", "c")
    val list2Values = List("a", "b", "d")

    for (i <- 0 until 5) {
      println(s"Iteration ${i}")
      println(s" - Approach 1: ${approach1(list1Values, list2Values)}")
      println(s" - Approach 2: ${approach2(list1Values, list2Values)}")
    }

  }
}

输出 :
Iteration 0
 - Approach 1: 0.5
 - Approach 2: 0.5
Iteration 1
 - Approach 1: 0.5
 - Approach 2: 0.5
Iteration 2
 - Approach 1: 0.5
 - Approach 2: 0.8
Iteration 3
 - Approach 1: 0.5
 - Approach 2: 0.8
Iteration 4
 - Approach 1: 0.5
 - Approach 2: 0.4

你为什么要使用它?

关于scala - 与普通方法相比,通过最小散列计算的 Spark Jaccard 相似度计算速度较慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36357931/

相关文章:

scala - 在scala中声明一个没有方括号的泛型类

rdd - Spark : AUC and PR curve 中二元分类的评估指标

apache-spark - Spark KMeans 聚类 : get the number of sample assigned to a cluster

python - pyspark,逻辑回归,如何获得各个特征的系数

scala - 如果编译时所有类型都被删除,Scala 如何进行类型匹配?

java - 我们如何在 Scala 中编写 maven2 插件?

python - 使用 Spark 压缩文件

apache-spark - 从结构数组中选择 Spark DataFrames 中的特定列

scala - 如何从有界泛型类型中获取类

apache-spark - 推荐系统在生产中如何工作?