我是 Spark 新手,有一些与 Spark RDD 操作和创建相关的问题:
val rdd1 = sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped = rdd1.mapPartitionsWithIndex{(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => x + " -> " + index).iterator
}
}
上面代码末尾的.iterator
有什么用?它将列表转换为迭代器吗?列表本身不就是迭代器吗,为什么我们最后还需要这个操作呢?
另外,为什么这比普通的 map() 函数更快?由于分区中的每个元素再次使用 map(x => x + "-> "+ index)
函数,这难道不是逐个元素工作的另一种方式吗?
另一件事,我想通过一次读取文件 4 行来创建 RDD。我在 Scala 中有以下代码:
val hconf = new org.apache.hadoop.conf.Configuration
hconf.set("mapreduce.input.lineinputformat.linespermap","4")
val line = sc.newAPIHadoopFile(inputFile,classOf[NLineInputFormat],classOf[LongWritable],classOf[Text],hconf).map(_._2.toString)
line.take(1).foreach(println)
但是输出仍然只打印一行。由于我已将 hconf
设置为读取 4 行,那么 RDD 中的每个元素不应该接收 inptFile 的 4 行吗?那么它不应该输出四行吗?
最佳答案
为什么使用.iterator?
mapPartitions
的函数参数是:
f: Iterator[T] => Iterator[U]
您粘贴的代码将每个迭代器转换为一个列表进行处理,并且需要在闭包末尾将其返回为迭代器以正确进行类型检查。 Spark 操作通常更喜欢流式传输数据,而不是一次将所有数据都存储在内存中,并且强制分区包含迭代器是该模型的一部分。
关于你的“列表是一个迭代器”断言,这并不完全正确——虽然List
是一个Iterable
,但它不是一个Iterator
。 Iterator
的特殊之处在于它们只能使用一次,因此它们不支持许多标准的 scala Collection 操作。 Iterator
和 Iterable
之间的主要区别在于这种“一次性”模型:一个 Iterable[T]
可以生成一个 Iterator[T ]
根据需要多次查看,但如果您只有一个 Iterator[T]
,则只能查看一次。
更高效的无列表实现
您粘贴的代码效率极低。您最终将所有数据复制到一个列表中,然后从该列表生成一个迭代器。您可以只映射迭代器:
val rdd1 = sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped = rdd1.mapPartitionsWithIndex{(index, iterator) => {
println("Called in Partition -> " + index)
iterator.map(x => x + " -> " + index)
}
}
每张 map 的线路
我认为您可能在这里设置了错误的配置参数。请参阅this question寻找可能的解决方案。
关于scala - 使用 Spark 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40322392/