python - 为什么我的 Spark DataFrame 比 RDD 慢很多?

标签 python apache-spark dataframe pyspark apache-spark-sql

我有一个非常简单的 Spark DataFrame,当运行 DataFrame groupby 时,性能非常糟糕——比(在我看来)等效的 RDD reduceByKey 慢大约 8 倍...

我缓存的 DF 只有两列,客户和名称只有 50k 行:

== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None

当我运行以下两个代码片段时,我希望得到类似的性能,而不是 rdd 版本在 10 秒内运行而 DF 版本在 85 秒内运行...

rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()

rawtempDF2.groupby('name').count().collect()

我是否遗漏了一些真正基本的东西? FWIW,RDD 版本运行 54 个阶段,DF 版本为 227 :/

编辑:我使用的是 Spark 1.6.1 和 Python 3.4.2。 Edit2:此外,源 Parquet 被分区为客户/日期/名称 - 目前有 27 个客户,1 天,c。 45 个名字。

最佳答案

这两个数字似乎都相对较高,并且您并不完全清楚您是如何创建 DataFrame 或测量时间的,但总的来说,像这样的差异可以通过与数字相比较少的记录数来解释分区数。

spark.sql.shuffle.partitions 的默认值为 200,这是您获得的任务数。对于 50K 记录,启动任务的开销将高于您从并行执行中获得的加速。让我们用一个简单的例子来说明这一点。首先让我们创建一个示例数据:

import string
import random

random.seed(323)

def random_string():
  n = random.randint(3, 6)
  return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )

df = (sc
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
    .cache())

并根据 shuffle.partitions 的数量测量时间:

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop

虽然这些值与您声称的值无法比较,并且这些数据是在本地模式下收集的,但您可以看到相对清晰的模式。这同样适用于 RDD:

from operator import add

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop

在适当的分布式环境中,由于网络 IO 的成本,这会更高。

只是为了比较,让我们检查一下在没有 Spark 的情况下在本地执行此任务需要多长时间

from collections import Counter

data = df.rdd.flatMap(lambda x: x).collect()

%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop

您还应该查看数据局部性。根据您使用的存储和配置,这可能会给您的作业增加额外的延迟,即使是像这样的小输入。

关于python - 为什么我的 Spark DataFrame 比 RDD 慢很多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38050140/

相关文章:

python - 使用 slack API 发布图片

architecture - Apache Spark 是否适用于大量小型快速计算和一些大型非交互式计算?

scala - 如何重命名 Scala 中 count() 函数生成的列

python - Flask 应用程序工厂模式与 Flask-Mail

python - seaborn FacetGrid : How to leave proper space on top for suptitle

apache-spark - 在 Databricks 上将 Spark.databricks.service.server.enabled 设置为 true 时到底会发生什么?

hadoop - 执行mapreduce作业时PySpark抛出错误

python - 从 Pandas 数据框中提取重复

python - 匹配 DataFrame 列中字符串中的独立单词

python - 从数据框或系列的 Pandas 输出中删除名称、数据类型