scala - 使用 Spark 分区

标签 scala apache-spark

我是 Spark 新手,有一些与 Spark RDD 操作和创建相关的问题:

val rdd1 =  sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped =   rdd1.mapPartitionsWithIndex{(index, iterator) => {
                                println("Called in Partition -> " + index)
                                val myList = iterator.toList
                                myList.map(x => x + " -> " + index).iterator
                               }
                            }

上面代码末尾的.iterator有什么用?它将列表转换为迭代器吗?列表本身不就是迭代器吗,为什么我们最后还需要这个操作呢?
另外,为什么这比普通的 map() 函数更快?由于分区中的每个元素再次使用 map(x => x + "-> "+ index) 函数,这难道不是逐个元素工作的另一种方式吗?

另一件事,我想通过一次读取文件 4 行来创建 RDD。我在 Scala 中有以下代码:

val hconf = new org.apache.hadoop.conf.Configuration
hconf.set("mapreduce.input.lineinputformat.linespermap","4")
val line = sc.newAPIHadoopFile(inputFile,classOf[NLineInputFormat],classOf[LongWritable],classOf[Text],hconf).map(_._2.toString)
line.take(1).foreach(println)

但是输出仍然只打印一行。由于我已将 hconf 设置为读取 4 行,那么 RDD 中的每个元素不应该接收 inptFile 的 4 行吗?那么它不应该输出四行吗?

最佳答案

为什么使用.iterator?

mapPartitions 的函数参数是:

f: Iterator[T] => Iterator[U]

您粘贴的代码将每个迭代器转换为一个列表进行处理,并且需要在闭包末尾将其返回为迭代器以正确进行类型检查。 Spark 操作通常更喜欢流式传输数据,而不是一次将所有数据都存储在内存中,并且强制分区包含迭代器是该模型的一部分。

关于你的“列表是一个迭代器”断言,这并不完全正确——虽然List是一个Iterable,但它不是一个IteratorIterator 的特殊之处在于它们只能使用一次,因此它们不支持许多标准的 scala Collection 操作。 IteratorIterable 之间的主要区别在于这种“一次性”模型:一个 Iterable[T] 可以生成一个 Iterator[T ] 根据需要多次查看,但如果您只有一个 Iterator[T],则只能查看一次。

更高效的无列表实现

您粘贴的代码效率极低。您最终将所有数据复制到一个列表中,然后从该列表生成一个迭代器。您可以只映射迭代器:

val rdd1 =  sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped =   rdd1.mapPartitionsWithIndex{(index, iterator) => {
                            println("Called in Partition -> " + index)
                            iterator.map(x => x + " -> " + index)
                           }
                        }  

每张 map 的线路

我认为您可能在这里设置了错误的配置参数。请参阅this question寻找可能的解决方案。

关于scala - 使用 Spark 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40322392/

相关文章:

java - 为什么PatriciaTrie中无法访问 `floorEntry`等方法?

scala - 使用elastic4s搜索整个URL

java - neo4j Java API getRelationships 方法的时间复杂度

generics - 如何获取 Scala 中类型的默认值?

apache-spark - 如何将 org.apache.spark.rdd.RDD[Array[Double]] 转换为 Spark MLlib 所需的 Array[Double]

apache-spark - Spark MLLib LDA结果解读

scala - Spark 斯卡拉 java.lang.NoSuchMethodError : while executing fat jar in CDH 5. 16

java - 在 Scala 中使用抽象成员类实现抽象的通用 Java 类

apache-spark - 将数据从 Spark-Streaming 存储到 Cassandra 时出现问题

sql - 窗口功能的默认窗口框架是什么