scala - 与普通方法相比,通过最小散列计算的 Spark Jaccard 相似度计算速度较慢

给定 2 个巨大的值列表,我正在尝试计算 jaccard similarity它们之间使用 Scala 在 Spark 中。

假设 colHashed1包含第一个值列表和 colHashed2包含第二个列表。


val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble)


我已经使用了解释的方法 here .

def getCRC32 (s : String) : Int =
    val crc=new CRC32
    return crc.getValue.toInt & 0xffffffff

val maxShingleID = Math.pow(2,32)-1
def pickRandomCoeffs(kIn : Int) : Array[Int] =
  var k = kIn
  val randList = Array.fill(k){0}

  while(k > 0)
    // Get a random shingle ID.

    var randIndex = (Math.random()*maxShingleID).toInt

    // Ensure that each random number is unique.
      randIndex = (Math.random()*maxShingleID).toInt

    // Add the random number to the list.
    k = k - 1
    randList(k) = randIndex

   return randList

val colHashed1 = => getCRC32(a))
val colHashed2 = => getCRC32(a))

val nextPrime = 4294967311L
val numHashes = 10

val coeffA = pickRandomCoeffs(numHashes)
val coeffB = pickRandomCoeffs(numHashes)

var signature1 = Array.fill(numHashes){0}
for (i <- 0 to numHashes-1)
    // Evaluate the hash function.
    val hashCodeRDD = => ((coeffA(i) * ele + coeffB(i)) % nextPrime))

    // Track the lowest hash code seen.
    signature1(i) = hashCodeRDD.min.toInt

var signature2 = Array.fill(numHashes){0}
for (i <- 0 to numHashes-1)
    // Evaluate the hash function.
    val hashCodeRDD = => ((coeffA(i) * ele + coeffB(i)) % nextPrime))

    // Track the lowest hash code seen.
    signature2(i) = hashCodeRDD.min.toInt

var count = 0
// Count the number of positions in the minhash signature which are equal.
for(k <- 0 to numHashes-1)
  if(signature1(k) == signature2(k))
    count = count + 1
val jSimilarity = count/numHashes.toDouble

就时间而言,方法 1 似乎总是优于方法 2。当我分析代码时,min() RDD 上的函数调用在方法 2 中需要花费大量时间,并且根据使用的散列函数的数量,该函数被多次调用。

与重复的 min() 函数调用相比,方法 1 中使用的交集和并集操作似乎工作得更快。

我不明白为什么 minHashing 在这里没有帮助。与普通方法相比,我希望 minHashing 工作得更快。我在这里做错了什么吗?



JaccardSimilarity 与 MinHash 没有给出一致的结果:


object Jaccard {
  def getCRC32(s: String): Int = {
    val crc = new CRC32
    return crc.getValue.toInt & 0xffffffff

  def pickRandomCoeffs(kIn: Int, maxShingleID: Double): Array[Int] = {
    var k = kIn
    val randList = Array.ofDim[Int](k)

    while (k > 0) {
      // Get a random shingle ID.
      var randIndex = (Math.random() * maxShingleID).toInt
      // Ensure that each random number is unique.
      while (randList.contains(randIndex)) {
        randIndex = (Math.random() * maxShingleID).toInt
      // Add the random number to the list.
      k = k - 1
      randList(k) = randIndex
    return randList

  def approach2(list1Values: List[String], list2Values: List[String]) = {

    val maxShingleID = Math.pow(2, 32) - 1

    val colHashed1 = => getCRC32(a))
    val colHashed2 = => getCRC32(a))

    val nextPrime = 4294967311L
    val numHashes = 10

    val coeffA = pickRandomCoeffs(numHashes, maxShingleID)
    val coeffB = pickRandomCoeffs(numHashes, maxShingleID)

    val signature1 = for (i <- 0 until numHashes) yield {
      val hashCodeRDD = => (coeffA(i) * ele + coeffB(i)) % nextPrime)
      hashCodeRDD.min.toInt // Track the lowest hash code seen.

    val signature2 = for (i <- 0 until numHashes) yield {
      val hashCodeRDD = => (coeffA(i) * ele + coeffB(i)) % nextPrime)
      hashCodeRDD.min.toInt // Track the lowest hash code seen

    val count = (0 until numHashes)
      .map(k => if (signature1(k) == signature2(k)) 1 else 0)
      .fold(0)(_ + _)

    val jSimilarity = count / numHashes.toDouble

  //  def approach1(list1Values: List[String], list2Values: List[String]) = {
  //    val colHashed1 = list1Values.toSet
  //    val colHashed2 = list2Values.toSet
  //    val jSimilarity = colHashed1.intersection(colHashed2).distinct.count / (colHashed1.union(colHashed2).distinct.count.toDouble)
  //    jSimilarity
  //  }

  def approach1(list1Values: List[String], list2Values: List[String]) = {
    val colHashed1 = list1Values.toSet
    val colHashed2 = list2Values.toSet

    val jSimilarity = (colHashed1 & colHashed2).size / (colHashed1 ++ colHashed2).size.toDouble

  def main(args: Array[String]) {

    val list1Values = List("a", "b", "c")
    val list2Values = List("a", "b", "d")

    for (i <- 0 until 5) {
      println(s"Iteration ${i}")
      println(s" - Approach 1: ${approach1(list1Values, list2Values)}")
      println(s" - Approach 2: ${approach2(list1Values, list2Values)}")


输出 :
Iteration 0
 - Approach 1: 0.5
 - Approach 2: 0.5
Iteration 1
 - Approach 1: 0.5
 - Approach 2: 0.5
Iteration 2
 - Approach 1: 0.5
 - Approach 2: 0.8
Iteration 3
 - Approach 1: 0.5
 - Approach 2: 0.8
Iteration 4
 - Approach 1: 0.5
 - Approach 2: 0.4


