mapreduce - Apache Spark 何时发生混洗?

标签 mapreduce apache-spark

我正在优化 Spark 中的参数,并且想确切地了解 Spark 是如何打乱数据的。

准确地说,我有一个简单的字数统计程序,并且想知道spark.shuffle.file.buffer.kb如何影响运行时间。现在,当我将此参数设置得非常高时,我只会看到速度减慢(我猜测这会阻止每个任务的缓冲区同时装入内存)。

有人可以解释 Spark 如何执行归约吗?例如,数据在 RDD 中读取和分区,当调用“action”函数时,Spark 向工作节点发送任务。如果操作是减少,Spark 如何处理这个问题,以及 shuffle 文件/缓冲区与此过程有何关系?

最佳答案

问题:至于您关于Spark何时触发洗牌的问题?

答案:任何 joincogroupByKey 操作都涉及将对象保存在 HashMap 或内存中用于分组或排序的缓冲区。 joincogroupgroupByKey 在它们触发的 shuffle 的获取端阶段的任务中使用这些数据结构。 reduceByKeyaggregateByKey 在其触发的洗牌两侧的阶段的任务中使用数据结构。

说明:Spark 中的随机操作如何工作?

与 Hadoop 相比,Spark 中的 shuffle 操作的实现方式有所不同。我不知道您是否熟悉它如何与 Hadoop 配合使用,但现在让我们关注 Spark。

ma​​p端,Spark中的每个map任务都会为每个reducer写出一个shuffle文件(操作系统磁盘缓冲区)——它对应于Spark中的一个逻辑 block 。这些文件不是中间文件,因为 Spark 不会将它们合并到更大的分区文件中。由于 Spark 中的调度开销较小,因此映射器 (M) 和化简器 (R) 的数量远高于 Hadoop 中的数量。因此,将 M*R 文件传送到相应的缩减程序可能会导致巨大的开销。

与Hadoop类似,Spark也提供了一个参数spark.shuffle.compress来指定压缩库来压缩map输出。在本例中,它可以是 Snappy(默认情况下)或 LZF。 Snappy 对于每个打开的文件仅使用 33KB 缓冲区,显着降低了遇到内存不足错误的风险。

reduce 方面,Spark 要求所有已打乱的数据适合相应的reducer 任务的内存,而Hadoop 则相反,它可以选择将其溢出到磁盘。当然,这种情况只会发生在以下情况:例如,reducer 任务需要对 GroupByKey 或 ReduceByKey 操作的所有混洗数据进行排序。在这种情况下,Spark 会抛出内存不足异常,到目前为止,这对开发人员来说是一个不小的挑战。

此外,Spark 不存在重叠复制阶段,这与 Hadoop 不同,Hadoop 具有重叠复制阶段,其中映射器甚至在映射完成之前就将数据推送到缩减器。这意味着在 Spark 中,shuffle 是一种操作,而在 Hadoop 中是一种操作。每个 reducer 还应该维护一个网络缓冲区来获取映射输出。该缓冲区的大小通过参数 spark.reducer.maxMbInFlight 指定(默认情况下为 48MB)。

有关 Apache Spark 中洗牌的更多信息,我建议阅读以下内容:

关于mapreduce - Apache Spark 何时发生混洗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31386590/

相关文章:

hadoop - 编写用于计算记录数的 MApreduce 代码

python - 如何在 pyspark 管道中打印最佳模型参数

scala - Spark Scala TF-IDF 值排序向量

sql - 使用 Spark SQL 跳过/获取

python - PySpark 计算关联

hadoop - Pig Mapreduce 计算连续的字母

hadoop - 如何确定Hive中的动态分区数

电影推荐的 MapReduce Jaccard 相似度计算

javascript - 递归对象数组以附加属性子项计数

hadoop - Spark程序在群集上运行非常慢