我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在 /output/testDummy/part-00000
和 /output/testDummy/part-00001
中获取数据。我想使用 Java/Scala 将这些分区保存在本地,这样我就可以将它们保存为 /users/home/indexes/index.nt
(通过在本地合并)或 /users/home/indexes/index-0000.nt
和 /home/indexes/index-0001.nt
分开。
这是我的代码:
注意:testDummy 与 test 相同,输出有两个分区。我想将它们单独存储或组合存储,但在本地使用 index.nt
文件。我更喜欢分别存储在两个数据节点中。我正在使用集群并在 YARN 上提交 spark 作业。我还添加了一些评论、次数以及我得到的数据。我怎么办?感谢您的帮助。
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
中得到任何东西
最佳答案
一些事情:
- 如果您打算稍后使用数据,请不要调用
Iterator.size
。迭代器
是TraversableOnce
.计算Iterator
大小的唯一方法是遍历它的所有元素,然后就没有更多的数据要读取了。 - 不要使用像
mapPartitions
这样的转换来产生副作用。如果您想执行某种类型的 IO,请使用foreach
/foreachPartition
等操作。这是一种不好的做法,并不能保证给定的代码片段只会执行一次。 - Action 或转换中的本地路径是特定工作人员的本地路径。如果您想直接在客户端机器上写入,您应该先使用
collect
或toLocalIterator
获取数据。写入分布式存储并稍后获取数据可能会更好。
关于java - 使用带迭代器的 mapPartition 保存 spark RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38044231/