python - 理论上,对于 Apache Spark,Scala 比 Python 更快。实际上并非如此。这是怎么回事?

标签 python scala apache-spark dataframe rdd

大家好。我会尽力解释我的问题,以便您能理解我。

在几个地方我发现它认为 Scala 比 Python 更快:

此外,据说Scala是最适合在Apache Spark中运行应用程序的编程语言:

https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213

但是,在此站点上,另一位用户 (@Mrityunjay) 提出了一个与我在此提出的问题类似的问题:

Spark performance for Scala vs Python

在这篇文章中,@zero323 的回复强调了以下内容:

  1. @zero323 展示了用 Scala 编写的程序与用 Python 编写的程序在性能上的巨大差异。
  2. @zero323 解释了 ReduceByKey 等操作的使用如何显着影响 Spark 应用程序的性能。
  3. @zero323 将 ReduceByKey 操作替换为 GroupByKey 操作,因此他可以提高@Mrityunjay 提出的程序的性能。

一般来说,回复解释是异常(exception)的,并且通过@zero323 的修改在 Scala 和 Python 之间实现了非常相似的执行时间。

考虑到这些信息,我给自己编写了一个简单的程序来解释我的应用程序中发生的类似情况,强调我在 Scala 中编写的代码比在 Scala 中编写的代码慢Python。为此,我避免使用 ReduceByKey 操作,只使用 map 操作。

我会尝试做任何 super 复杂的操作来最大化集群占用率(96 核,48 GB RAM)并实现大延迟。为此,代码生成了一组100万条人工数据(唯一目的是计算处理100万条数据的执行时间,无论是否复制),其中包含一个标识符ID,一个长度为10的向量DoubleS 的。

由于我的应用是用DataFrame实现的,所以我用Scala写了两个程序,一个用RDD,一个用DataFrame,目的是观察是不是DataFrame的问题。同样,用 Python 编写了一个等效程序。

通常,对每个 RDD/DataFrame 记录应用一个操作,其结果放在一个附加字段中,产生一个包含原始字段的新 RDD/DataFrame 和一个包含结果的新字段。

这是 Scala 中的代码:

import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal

object RDDvsDFMapComparison {
  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("Test").getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._

    val parts = 96
    val repl = 1000000
    val rep = 60000000

    val ary = (0 until 10).toArray
    val m = Array.ofDim[Int](repl, ary.length)
    for (i <- 0 until repl)
      m(i) = ary

    val t1_start = System.nanoTime()
    if (args(0).toInt == 0) {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1)).toDF("Name", "Data")
      val c1 = b1.map { x =>
        val name = x.getString(0)
        val data = x.getSeq[Int](1).toArray
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }.toDF("Name", "Data", "Mean")
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    } else {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1))
      val c1 = b1.map { x =>
        val name = x._1
        val data = x._2
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    }
    val t1_end = System.nanoTime()
    val t1 = t1_end - t1_start
    println("Map operation elapses: " + BigDecimal(t1.toDouble / 1000000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble.toString + " seconds.")
  }
}

这是 Python 中的代码(简单得多):

#!/usr/bin/python
# -*- coding: latin-1 -*-

import sys
import time
import math
from pyspark import SparkContext, SparkConf

def myop(key, value):
  s = 0.0
  for j in range(r):
    s += math.exp(math.log(sum(value)) / math.log(float(len(value))))
  return (key, value, s)

if __name__ == "__main__":
  conf = SparkConf().setAppName("rddvsdfmapcomparison")
  sc = SparkContext(conf=conf)
  parts = 96
  repl = 1000000
  r = 60000000
  ary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  m = []
  for i in range(repl): m.append(ary)
  start = time.time()
  a2 = sc.parallelize(m, parts)
  b2 = a2.zipWithIndex().map(lambda (value, key): (key, value))
  c2 = b2.map(lambda (key, value): myop(key, value))
  c2.count
  d2 = c2.take(5)
  print '[%s]' % ', '.join(map(str, d2))
  end = time.time()
  print 'Elapsed time is', round(end - start, 2), 'seconds'
  sc.stop()

结果很明显。用 Python 实现的程序比用 Scala 实现的任何程序都要快,无论是使用 RDD 还是 DataFrame。还可以观察到,RDD 中的程序比 DataFrame 中的程序稍快,这是由于使用解码器提取 DataFrame 记录的每个字段的数据类型而保持一致。

问题是,我做错了什么? Scala 代码不是比 Python 快吗?有人可以向我解释我的代码中做错了什么吗?来自 @zero323 的响应非常好并且具有说明性,但我不明白像这样的简单代码在 Scala 中为什么会比在 Python 中慢。

非常感谢您花时间阅读我的问题。

最佳答案

在 Scala 中尝试这个实现。它更快:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val parts = 96
val repl = 1000000
val rep = 20000

val m = Vector.tabulate(repl, 10)((_,i) => i)

val myop = udf( (value: Seq[Int]) =>
  (0 until rep).foldLeft(0.0) {(acc,_)=>
    acc + Math.exp(Math.log(value.sum) / Math.log(value.length))
  }
)

val c1 = sc.parallelize(m, parts)
  .toDF("Data")
  .withColumn("Name",monotonically_increasing_id())
  .withColumn("Mean",myop('Data))

c1.count()
val d1 = c1.take(5)
println(d1.deep.mkString(","))

如果我理解 myop 实际执行的功能,我认为它可能会更清晰。

编辑:

正如@user6910411 在评论中提到的那样,此实现速度更快只是因为它与 Python 的代码完全相同(跳过大部分计算)。问题中提供的原始 Scala 和 Python 实现不相等。

关于python - 理论上,对于 Apache Spark,Scala 比 Python 更快。实际上并非如此。这是怎么回事?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52713466/

相关文章:

python - 如何更改 numpy recarray 某些列的数据类型?

python - tf 和 tf.keras 密集层在我的设置中显示出完全不同的行为

apache-spark - 在 Spark Streaming 中读取 Hbase 数据

python - 在函数中调用 locals() 不直观?

python - 如何理解wxGridBagSizer?

scala - 函数式编程的非数值用例?

python - 为什么pyspark中两种不同的数据处理方式会产生不同的结果?

scala - 无法执行用户定义的函数($anonfun$9 : (string) => double) on using String Indexer for multiple columns

scala - 如何获得List元素之间的最小间距?

scala - 副作用是纯函数中找不到的一切吗?