apache-spark - Spark - First() 操作的行为

标签 apache-spark pyspark

我试图了解 Spark 为简单的first() 与collect() 操作创建的作业。

给定代码:

myRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])

def func(d):
    return d + '-foo'

myRDD = myRDD.map(func)

我的 RDD 分为 16 个分区:

print(myRDD.toDebugString())
(16) PythonRDD[24] at RDD at PythonRDD.scala:48 []
 |   ParallelCollectionRDD[23] at parallelize at PythonRDD.scala:475 []

如果我打电话:

myRDD.collect()

我得到了 1 份工作,创建了 16 个任务。我假设这是每个分区的一项任务。

但是,如果我打电话:

myRDD.first()

我获得了 3 份工作,创建了 1、4 和 11 个任务。为什么创造了 3 个就业岗位?

我正在使用由 Mesos 配置的单个 16 核执行器运行 Spark-2.0.1。

最佳答案

这实际上是非常聪明的 Spark 行为。您的 map() 是转换(它是惰性评估的),并且 first()collect() 都是操作(终端操作)。所有转换都会在您调用操作时应用于数据。

当您调用 first() 时,spark 会尝试执行尽可能少的操作(转换)。首先,它尝试一个随机分区。如果没有结果,则再进行4次分区并计算。同样,如果没有结果,spark 会再进行 4 次分区 (5 * 4),并再次尝试获取任何结果。

在您的第三次尝试中,您只有 11 个未触及的分区 (16 - 1 - 4)。如果 RDD 中的数据较多或分区数量较少,spark 可能会更快找到 first() 结果。

关于apache-spark - Spark - First() 操作的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40082417/

相关文章:

python - Pyspark 数据帧按字母顺序拆分并写入 S3

python - 将数据框列和外部列表传递给 withColumn 下的 udf

apache-spark - 来自 RDD 的 PySpark LDA 模型密集向量

apache-spark - 在 spark-submit 中为 spark 应用程序提供带有空格的参数/arg 的正确方法

java - Spark 和 Cassandra Java 应用程序 : Exception in thread "main" java. lang.NoClassDefFoundError: org/apache/spark/sql/Dataset

amazon-s3 - 检查 Databricks 笔记本中是否存在 S3 目录

azure - 如何使用pyspark以表格形式打印StringType()的 "dictionary"

apache-spark - 在 delta lake 中高效读取/转换分区数据

azure - 是否可以将 Databricks 笔记本安装到类似于库的集群中?

java - 如何累积运行 Spark sql 聚合器?