我在 hdfs 上有一些基于 thrift-compact 的自定义序列化格式文件作为输入。通过扩展 hadoop 的 FileInputFormat,我们可以快速将文件加载到 RDD 结构中。
现在,在应用一些 groupBy 转换后,输出 RDD 变为 JavaPairRDD<Long, Iterable<thriftGeneratedClass>>
.
现在,我想通过 PairRDD 中的键将 groupBy RDD 结果保存回具有多个输出文件的 HDFS。 E.X.:如果我们在PairRDD中有两个key-100、200,就会生成两个文件100.thrift和200.thrift。每个文件都包含 thrift 类的所有可迭代列表。
代码如下:
//Feature is some thrift generated class
JavaRDD<Feature> featureJavaRDD = jsc.newAPIHadoopFile(inputPath, ThriftInputFormat.class,
NullWritable.class, Feature.class, jsc.hadoopConfiguration()).values();
JavaPairRDD<Long, Iterable<Feature>> groupByRDD = featureJavaRDD.groupBy(...)
//how to save groupByRDD results back to HDFS with files by key
我的问题是:实现这一目标的最佳方法是什么?我知道答案可能涉及 hadoop
saveAsNewAPIHadoopFile
和 MultipleOutputs
.
最佳答案
几天前我有一个类似的用例,我通过编写两个实现 MultipleTextOutputFormat
的自定义类在 Java 中解决了它和 RecordWriter
.
我的输入是 JavaPairRDD<String, List<String>>
我想将它存储在一个由它的键命名的文件中,所有行都包含在它的值中。 (所以,这几乎是相同的用例)
这是我的 MultipleTextOutputFormat
的代码执行
class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString(); //The return will be used as file name
}
/** The following 4 functions are only for visibility purposes
(they are used in the class MyRecordWriter) **/
protected String generateLeafFileName(String name) {
return super.generateLeafFileName(name);
}
protected V generateActualValue(K key, V value) {
return super.generateActualValue(key, value);
}
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
return super.getInputFileBasedOutputFileName(job, name);
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
return super.getBaseRecordWriter(fs, job, name, arg3);
}
/** Use my custom RecordWriter **/
@Override
RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
final String myName = this.generateLeafFileName(name);
return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
}
}
这是我的
RecordWriter
的代码执行。class MyRecordWriter<K, V> implements RecordWriter<K, V> {
private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
private final FileSystem fs;
private final JobConf job;
private final Progressable arg3;
private String myName;
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
this.fs = fs;
this.job = job;
this.arg3 = arg3;
this.myName = myName;
}
@Override
void write(K key, V value) throws IOException {
String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if(rw == null) {
rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
}
@Override
void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while(keys.hasNext()) {
RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
}
大部分代码与
FileOutputFormat
中的完全相同.唯一的区别是那几行List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
这些行允许我写下我输入的每一行
List<String>
文件上。 write
的第一个参数函数设置为 null
为了避免在每一行上写 key 。要完成,我只需要执行此调用来编写我的文件
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
关于java - 将 groupBy rdd 结果保存回 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45454798/