看起来 RDD.take()
只是在序列文件的支持下重复读取的最后一个元素。
例如:
val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable])
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5)
System.out.println(records.map(_._2.toString).mkString("\n"))
输出:
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
即使我知道行是唯一的。
sc.binaryRecords()
也存在此问题。
我意识到这可能与 Hadoop 可写缓存问题有关,但是否有解决此问题的计划?有任何解决方法吗?
最佳答案
我尝试复制您的问题,是的,当直接对 sc.sequenceFile() 的结果调用 take 时,我也看到了类似的行为。但能够找到解决方法:
注意:我正在解释使用 LongWritable 和 Text 而不是 RecordWritable。我不确定 RecordWritable 所需的导入
我的序列文件有:(0,0) (1,1) (2,2) ...
val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text])
val map = rdd.map(case (k,v) => (k.get(),v.toString()))
map.take(1);
res5: Array[(Long, String)] = Array((0,0))
map.take(5);
res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4))
关于hadoop - 带有序列文件的 Spark RDD take(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33660497/