scala - 为什么 spark 排序比 scala 原始排序方法慢

标签 scala sorting apache-spark

听很多人说Spark擅长排序和分布式计算。目前,我们的团队正在研究 spark 和 scala。我们将在 spark 上实现一个排序服务。现在,我已经设置了 spark 集群,并尝试在 spark 集群上运行和排序示例,但是排序的成本时间似乎很长。这是我的代码。

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
 * Created by  on 1/1/15.
 */
object AdvancedSort {
  /**
   * bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3
   * @param args
   */
  def main(args: Array[String]) {
    val sampleSize = if (args.length > 0) args(0).toInt else 100000
    val slice = if (args.length > 1) args(1).toInt else 3

    sort(sampleSize, slice)
  }

  def sort(listSize: Int, slice: Int): Unit = {
    val conf = new SparkConf().setAppName(getClass.getName)
    val spark = new SparkContext(conf)
    val step1 = System.currentTimeMillis()
    val data = genRandom(listSize)
    val step2 = System.currentTimeMillis()
    println(">>>>>>>>>> genRandom : " + (step2 - step1))

    val distData = spark.parallelize(data, slice)
    val step3 = System.currentTimeMillis()
    println(">>>>>>>>>> parallelize : " + (step3 - step2))

    val result = distData.sortBy(x => x, true).collect
    val step4 = System.currentTimeMillis()
    println(">>>>>>>>>> sortBy and collect: " + (step4 - step3))
    println(">>>>>>>>>> total time : " + (step4 - step1))

    printlnArray(result, 0, 10)

    spark.stop()
  }

  /**
   * generate random number
   * @return
   */
  def genRandom(listSize: Int): List[Int] = {
    val range = 100000
    var listBuffer = new ListBuffer[Int]
    val random = new Random()
    for (i <- 1 to listSize) listBuffer += random.nextInt(range)
    listBuffer.toList
  }

  def printlnList(list: List[Int], start: Int, offset: Int) {
    for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i))
  }

  def printlnArray(list: Array[Int], start: Int, offset: Int) {
    for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i))
  }
}

将上述代码部署到spark集群后,我在Master的Spark Home下运行如下命令:

bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3

以下是我最终得到的花费时间。

>>>>>>>>>> genRandom : 86
>>>>>>>>>> parallelize : 53
>>>>>>>>>> sortBy and collect: 6756

这看起来很奇怪,因为如果我在我的本地机器上通过 scala 的排序方法运行 100000 个 Int 的随机数据,成本时间比 spark 更快。

import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
 * Created by  on 1/5/15.
 */
object ScalaSort {
  def main(args: Array[String]) {
    val list = genRandom(1000000)
    val start = System.currentTimeMillis()
    val result = list.sorted
    val end = System.currentTimeMillis()
    println(">>>>>>>>>>>>>>>>>> cost time : " + (end - start))
  }

  /**
   * generate random number
   * @return
   */
  def genRandom(listSize: Int): List[Int] = {
    val range = 100000
    var listBuffer = new ListBuffer[Int]
    val random = new Random()
    for (i <- 1 to listSize) listBuffer += random.nextInt(range)
    listBuffer.toList
  }
}

scala sorted 方法在本地机器上的耗时

>>>>>>>>>>>>>>>>>> cost time : 169

在我看来,服装 Spark 的排序时间有以下几个因素:

  1. Master和Worker之间的数据传输

  2. 在 Worker 上排序很快,通过合并可能会很慢。

有spark高手知道为什么会这样吗?

最佳答案

Spark 专为大数据而生。 当您向其中插入很小的数字时,它的运行速度会变慢,因为在所有核心/集群上的分布比正常排序所花费的时间要多。 尝试使用更大的数据或代替 Spark 在 Scala 中使用 ParCollections :

collection.par.<any code here>

关于scala - 为什么 spark 排序比 scala 原始排序方法慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27778210/

相关文章:

java - 使用企业 Java Bean 提升

java - 如何最有效地计算俄罗斯方 block 堆栈的高度剖面?

ruby-on-rails - 表轨中 ActiveRecord::Associations::CollectionProxy 的排序顺序

apache-spark - 如何在Mac上使用Homebrew安装apache-spark 2.2.0

scala - 案例类修改和设计通用方式

scala - 从 Spark DataFrame 中删除仅满足两个条件的行

python - 循环将列表拆分为不同的子列表

javascript - 如何在 JavaScript 中按多列对多维数组进行排序?

python - 使用pyspark中的函数进行行操作

scala - 使用 Spark Scala 计算平均值