hadoop - 带有序列文件的 Spark RDD take()

标签 hadoop apache-spark sequencefile

看起来 RDD.take() 只是在序列文件的支持下重复读取的最后一个元素。
例如:

val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable])
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5)
System.out.println(records.map(_._2.toString).mkString("\n"))

输出:

Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)

即使我知道行是唯一的。

sc.binaryRecords() 也存在此问题。

我意识到这可能与 Hadoop 可写缓存问题有关,但是否有解决此问题的计划?有任何解决方法吗?

最佳答案

我尝试复制您的问题,是的,当直接对 sc.sequenceFile() 的结果调用 take 时,我也看到了类似的行为。但能够找到解决方法:

注意:我正在解释使用 LongWritable 和 Text 而不是 RecordWritable。我不确定 RecordWritable 所需的导入
我的序列文件有:(0,0) (1,1) (2,2) ...

val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text])
val map = rdd.map(case (k,v) => (k.get(),v.toString()))
map.take(1);
res5: Array[(Long, String)] = Array((0,0))
map.take(5);
res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4))

关于hadoop - 带有序列文件的 Spark RDD take(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33660497/

相关文章:

python - Spark DataFrameWriter 使用 TIMESTAMP 而不是 DATETIME

java - Spark 上的 Alluxio 帧 size() 大于 max()

hadoop - 使用 Hive Session Close 清理资源

scala - 如何使用导入scala.sys.process程序包使用cat文件并计算文件中的行数

Hadoop集群数据节点网络错误

java - @JsonView 未渲染 Spark 中部署的 Spring Boot 应用程序

hadoop - 如何使用 Mahout 的序列文件 API 代码?

apache-spark - 在 PySpark 中获取序列文件格式文件的 HDFS 文件路径

hadoop - 在Hadoop 2.0中读取sequencefile

sql-server - 当表在SQL Server上包含点时,由错误的FROM子句引起的Sqoop无效对象名称错误