java - 将 groupBy rdd 结果保存回 HDFS

标签 java hadoop hdfs rdd thrift

我在 hdfs 上有一些基于 thrift-compact 的自定义序列化格式文件作为输入。通过扩展 hadoop 的 FileInputFormat,我们可以快速将文件加载到 RDD 结构中。

现在,在应用一些 groupBy 转换后,输出 RDD 变为 JavaPairRDD<Long, Iterable<thriftGeneratedClass>> .
现在,我想通过 PairRDD 中的键将 groupBy RDD 结果保存回具有多个输出文件的 HDFS。 E.X.:如果我们在PairRDD中有两个key-100、200,就会生成两个文件100.thrift和200.thrift。每个文件都包含 thrift 类的所有可迭代列表。
代码如下:

//Feature is some thrift generated class 
JavaRDD<Feature> featureJavaRDD = jsc.newAPIHadoopFile(inputPath, ThriftInputFormat.class,
            NullWritable.class, Feature.class, jsc.hadoopConfiguration()).values();
JavaPairRDD<Long, Iterable<Feature>> groupByRDD = featureJavaRDD.groupBy(...)
//how to save groupByRDD results back to HDFS with files by key

我的问题是:实现这一目标的最佳方法是什么?我知道答案可能涉及 hadoop saveAsNewAPIHadoopFileMultipleOutputs .

最佳答案

几天前我有一个类似的用例,我通过编写两个实现 MultipleTextOutputFormat 的自定义类在 Java 中解决了它和 RecordWriter .

我的输入是 JavaPairRDD<String, List<String>>我想将它存储在一个由它的键命名的文件中,所有行都包含在它的值中。 (所以,这几乎是相同的用例)

这是我的 MultipleTextOutputFormat 的代码执行

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

这是我的 RecordWriter 的代码执行。
class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

大部分代码与 FileOutputFormat 中的完全相同.唯一的区别是那几行
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

这些行允许我写下我输入的每一行 List<String>文件上。 write 的第一个参数函数设置为 null为了避免在每一行上写 key 。

要完成,我只需要执行此调用来编写我的文件
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);

关于java - 将 groupBy rdd 结果保存回 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45454798/

相关文章:

java - 获取自定义 Spring 作用域中的当前请求

java - 如何将 YYYY-MM-DDTHH :mm:ss. SSSZ 格式的时间转换为默认时区?

scala - AWS S3 中的 FileUtil.copyMerge()

hadoop - 以高吞吐量在 HDFS 上进行流式数据写入

java - 以通用方式设置 ObjectMapper 属性?

java - C++中new运算符和java中new运算符的区别

hadoop - 如何在Hadoop中设置NameNodes、DataNodes、Mappers和Reducers的数量

hadoop - hdfs dfs -test/dir_name的作用是什么?

java - 将 Apache Spark 添加到 Eclipse Maven 项目时出现问题

hadoop -cp 限制文件数量