java - 使用带迭代器的 mapPartition 保存 spark RDD

标签 java scala hadoop apache-spark hdfs

我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在 /output/testDummy/part-00000/output/testDummy/part-00001 中获取数据。我想使用 Java/Scala 将这些分区保存在本地,这样我就可以将它们保存为 /users/home/indexes/index.nt(通过在本地合并)或 /users/home/indexes/index-0000.nt/home/indexes/index-0001.nt 分开。

这是我的代码: 注意:testDummy 与 test 相同,输出有两个分区。我想将它们单独存储或组合存储,但在本地使用 index.nt 文件。我更喜欢分别存储在两个数据节点中。我正在使用集群并在 YARN 上提交 spark 作业。我还添加了一些评论、次数以及我得到的数据。我怎么办?感谢您的帮助。

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS:我关注了,thisthis但与我正在寻找的不完全相同,我以某种方式做了但没有在 index.nt

中得到任何东西

最佳答案

一些事情:

  • 如果您打算稍后使用数据,请不要调用 Iterator.size迭代器TraversableOnce .计算 Iterator 大小的唯一方法是遍历它的所有元素,然后就没有更多的数据要读取了。
  • 不要使用像 mapPartitions 这样的转换来产生副作用。如果您想执行某种类型的 IO,请使用 foreach/foreachPartition 等操作。这是一种不好的做法,并不能保证给定的代码片段只会执行一次。
  • Action 或转换中的本地路径是特定工作人员的本地路径。如果您想直接在客户端机器上写入,您应该先使用 collecttoLocalIterator 获取数据。写入分布式存储并稍后获取数据可能会更好。

关于java - 使用带迭代器的 mapPartition 保存 spark RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38044231/

相关文章:

hadoop - 无法处理 PIG 中的时间序列数据

hadoop - pig : filtering out empty string

scala - 我在函数中以 var 的形式发送一个变量,但我以 val 的形式接收它

scala - ENSIME 找不到正确的项目定义

scala - 服务于一项任务的多个 Scala 参与者

scala - Spark - 将 CSV 文件加载为 DataFrame?

java - 构建一个可以在移动和 Web 应用程序之间同时传输数据的服务器端 API

java - 使用selenium查找页面源

java - 在android 4中设置可滚动的壁纸

java - Object 类的 equals 方法 - 它是如何工作的 - Java