python - pyspark 性能与纯 python 做简单求和

标签 python pyspark

在一个 4 个 CPU 的 Ubuntu 16.04 虚拟机上,我对 pyspark 与纯 python 的性能做了一个简单的比较。我在具有 4 个 cpu 的虚拟机上将 spark 作为本地安装运行。

#!/home/python3/venv/bin/python3
import pyspark
from pyspark.sql import SparkSession
from operator import add
from datetime import datetime

spark = SparkSession.builder.appName('ai_project').getOrCreate()
len = 1000000000
for i in range(1):
    start = datetime.now()
    print("start:", start)
    t=spark.sparkContext.parallelize(range(len))
    a = t.fold(0, add)
    print(a)
    end= datetime.now()
    print("end for spark sum:", end, end-start)  

s = 0
start = datetime.now()
print("start for loop sum:", start)
for i in range(len):
    s = s + i
print("sum=", s)
end= datetime.now()
print("end for loop sum:", end, end-start)

这是输出:

(venv) cju@ubuntu-16:~/cloudmap3-ai-datasets/examples$ ./t1.py 
20/05/15 10:22:34 WARN Utils: Your hostname, ubuntu-16 resolves to a loopback address: 127.0.1.1; using 192.168.2.113 instead (on interface enp0s3)
20/05/15 10:22:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/05/15 10:22:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
start: 2020-05-15 10:22:36.771454
49999999995000000000                                                            
end for spark sum: 2020-05-15 10:36:34.811598 0:13:58.040144
start for loop sum: 2020-05-15 10:36:34.811630
sum= 49999999995000000000
end for loop sum: 2020-05-15 10:47:53.770664 0:11:18.959034

问题是:

 1. pyspark code took 13:58 minutes and the pure python code took 11:18
    minutes. when pyspark code is running, 4 CPUs is at 100%
    utilization. when pure python code is running, only 1 CPU is at 100%
    utilization. I would expect the pyspark code take much less time.
 2. Another question is about the add operator. Is the add operation
    done in a python process and the result is communicated back to JVM?
    Or the add operation is done in the JVM of the worker process?
 3. add is not a udf. It should be running in JVM rather than in a 
    python process. Could someone explain more?

将 fold 更改为 reduce 会得到相同的结果。我还尝试使用数据框来求和。但情况更糟。当数据帧总和运行时,只有一个 CPU 处于 100%。此外,数据帧使用的内存更多,速度也更慢。这是比较:

--------------------------------------------------    
|len   |pyspark rdd|pyspark dataframe|pure python|
--------------------------------------------------
|10^7  |1.39sec    |16.36sec         |0.67sec    |
--------------------------------------------------
|10^8  |7.02sec    | out of memory   |6.94sec    |
--------------------------------------------------
|10^9  |13:58min   | out of memory   |11:18min   |
--------------------------------------------------

最佳答案

如果您将 spark 性能与 python 或 pandas 进行小型操作进行比较,那么 pandas 将始终优于 spark。

Spark 是一个分布式处理引擎,当数据量非常大(10 GB)时应该使用它,因为 spark 将数据分发到所有内核并单独处理。

现在对于你的求和操作,python 执行得更快,因为它在单核上处理它并且它不包括任何开销,而 spark 首先分发它然后处理它然后将结果减少到单个节点,这本身就是很多开销这么小的操作。

如果您希望此类操作获得更多性能提升,我建议您使用 Numpy,因为它基于 C 并执行矢量化计算。

关于python - pyspark 性能与纯 python 做简单求和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61811453/

相关文章:

使用强制架构的 Pyspark RDD 到 DataFrame : Value Error

python - Django 模板上下文错误

python - 如何连接具有公共(public)列值的行?

python - (Python--numpy) 如何在不循环的情况下调整 numpy 数组的大小和切片?

Python 预定义函数

python - 使用spark SQL读取Parquet格式的不存在列

python - pyspark: ImportError: 没有名为 numpy 的模块

apache-spark - 如何使用 Pyspark 中的 withColumn 从一个条件创建多个列?

python - ValueError : Cannot feed value of shape () for Tensor 'input_example_tensor:0' , 其形状为 '(?,)'

azure - 从笔记本电脑 Spark 读取 Azure Blob 存储中的文件时出错