performance - Apache Spark : map vs mapPartitions?

标签 performance scala apache-spark rdd

RDD's 之间有什么区别? mapmapPartitions 方法? flatMap 的行为类似于 map 还是 mapPartitions?谢谢。

(编辑) 即之间有什么区别(语义上或执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

还有:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }

最佳答案

小鬼。提示:

Whenever you have heavyweight initialization that should be done once for many RDD elements rather than once per RDD element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), use mapPartitions() instead of map(). mapPartitions() provides for the initialization to be done once per worker task/thread/partition instead of once per RDD data element for example : see below.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})
<小时/>

Q2. does flatMap behave like map or like mapPartitions?

是的。请参阅 flatmap 的示例 2.. 其不言自明。

Q1. What's the difference between an RDD's map and mapPartitions

map works the function being utilized at a per element level while mapPartitions exercises the function at the partition level.

示例场景:如果我们在特定的 RDD 分区中有 100K 个元素,那么我们将触发映射转换使用的函数当我们使用map时,有10万次。

相反,如果我们使用mapPartitions,那么我们只会调用特定函数一次,但我们将传入所有 100K 记录并在一次函数调用中获取所有响应。

由于 map 在特定函数上工作很多次,因此性能会有所提升,特别是如果该函数每次都执行一些昂贵的操作,而如果我们通过了则不需要执行这些操作一次包含在所有元素中(在 mappartitions 的情况下)。

map

Applies a transformation function on each item of the RDD and returns the result as a new RDD.

Listing Variants

def map[U: ClassTag](f: T => U): RDD[U]

示例:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

map 分区

This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.

Listing Variants

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

示例1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

示例2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上面的程序也可以使用 flatMap 编写,如下所示。

使用平面 map 的示例 2

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

结论:

mapPartitions 转换比 map 更快,因为它调用您的函数一次/分区,而不是一次/元素..

进一步阅读:foreach Vs foreachPartitions When to use What?

关于performance - Apache Spark : map vs mapPartitions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21185092/

相关文章:

scala - 使用 BasicAuth 进行喷射身份验证方法

json - 如何忽略 JSON 数组中的解码失败?

python - 如何在 jupyter 笔记本中运行 Spark-Submit?

ruby - 是什么让 Ruby 变慢了?

java - while-else-循环

javascript - 在 React 中构建一个大型多选而不是真的很慢

amazon-web-services - AWS EMR 上的奇怪 Spark 错误

performance - 在测试中包含静态资源,如图像、css、js 等

scala - 在 Scala 函数中按名称参数调用

apache-spark - 使用 Spark 和 DataFrames 的 Parquet 与 Cassandra