RDD's 之间有什么区别? map
和 mapPartitions
方法? flatMap
的行为类似于 map
还是 mapPartitions
?谢谢。
(编辑) 即之间有什么区别(语义上或执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
还有:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
最佳答案
小鬼。提示:
Whenever you have heavyweight initialization that should be done once for many
RDD
elements rather than once perRDD
element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), usemapPartitions()
instead ofmap()
.mapPartitions()
provides for the initialization to be done once per worker task/thread/partition instead of once perRDD
data element for example : see below.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
<小时/>
Q2. does
flatMap
behave like map or likemapPartitions
?
是的。请参阅 flatmap
的示例 2.. 其不言自明。
Q1. What's the difference between an RDD's
map
andmapPartitions
map
works the function being utilized at a per element level whilemapPartitions
exercises the function at the partition level.
示例场景:如果我们在特定的 RDD
分区中有 100K 个元素,那么我们将触发映射转换使用的函数当我们使用map
时,有10万次。
相反,如果我们使用mapPartitions
,那么我们只会调用特定函数一次,但我们将传入所有 100K 记录并在一次函数调用中获取所有响应。
由于 map
在特定函数上工作很多次,因此性能会有所提升,特别是如果该函数每次都执行一些昂贵的操作,而如果我们通过了则不需要执行这些操作一次包含在所有元素中(在 mappartitions
的情况下)。
map
Applies a transformation function on each item of the RDD and returns the result as a new RDD.
Listing Variants
def map[U: ClassTag](f: T => U): RDD[U]
示例:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
map 分区
This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.
preservesPartitioning
indicates whether the input function preserves the partitioner, which should befalse
unless this is a pair RDD and the input function doesn't modify the keys.Listing Variants
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
示例1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
示例2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上面的程序也可以使用 flatMap 编写,如下所示。
使用平面 map 的示例 2
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论:
mapPartitions
转换比 map
更快,因为它调用您的函数一次/分区,而不是一次/元素..
关于performance - Apache Spark : map vs mapPartitions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21185092/