hadoop - 如何在hadoop reducer中编写不同格式的多个输出?

标签 hadoop mapreduce cassandra reduce

如何在reducer中使用MultipleOutputs类来编写多个输出,每个输出都可以有自己独特的配置? MultipleOutputs javadoc 中有一些文档,但似乎仅限于文本输出。事实证明,MultipleOutputs 可以处理每个输出的输出路径、键类和值类,但尝试使用需要使用其他配置属性的输出格式会失败。

(这个问题已经出现好几次了,但我试图回答这个问题的尝试却遭到了挫败,因为提问者实际上有一个不同的问题。由于这个问题已经花了我几天的时间来调查才能回答,所以我正在回答根据 this Meta Stack Overflow question 的建议,我自己的问题在这里。)

最佳答案

我已经浏览了 MultipleOutputs 实现,发现它不支持任何具有 outputDir、键类和值类之外的属性的 OutputFormatType。我尝试编写自己的 MultipleOutputs 类,但失败了,因为它需要调用 Hadoop 类中某处的私有(private)方法。

我只剩下一种似乎适用于所有情况以及输出格式和配置的所有组合的解决方法:编写我想要使用的 OutputFormat 类的子类(结果证明这些是可重用的)。这些类了解其他 OutputFormat 正在同时使用,并且知道如何存储它们的属性。该设计利用了这样一个事实:OutputFormat 可以在被请求其 RecordWriter 之前使用上下文进行配置。

我已经让它与 Cassandra 的 ColumnFamilyOutputFormat 一起使用:

package com.myorg.hadoop.platform;

import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public abstract class ConcurrentColumnFamilyOutputFormat 
                        extends ColumnFamilyOutputFormat 
                        implements Configurable {

private static String[] propertyName = {
        "cassandra.output.keyspace" ,
        "cassandra.output.keyspace.username" ,
        "cassandra.output.keyspace.passwd" ,
        "cassandra.output.columnfamily" ,
        "cassandra.output.predicate",
        "cassandra.output.thrift.port" ,
        "cassandra.output.thrift.address" ,
        "cassandra.output.partitioner.class"
        };

private Configuration configuration;

public ConcurrentColumnFamilyOutputFormat() {
    super();
}

public Configuration getConf() {
    return configuration;
}

public void setConf(Configuration conf) {

    configuration = conf;

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(property);
        if (value != null) {
            conf.set(propertyName[i], value);
        }
    }

}

public void configure(Configuration conf) {

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(propertyName[i]);
        if (value != null) {
            conf.set(property, value);
        }
    }

}

public abstract String getMultiOutputName();

}

对于您想要 reducer 的每个 Cassandra(在本例中)输出,您将有一个类:

package com.myorg.multioutput.ReadCrawled;

import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;

public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {

    public StrongOutputFormat() {
        super();
    }

    @Override
    public String getMultiOutputName() {
        return "Strong";
    }

}

并且您可以在映射器/ reducer 配置类中配置它:

    // This is how you'd normally configure the ColumnFamilyOutputFormat

ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    // This is how you tell the MultipleOutput-aware OutputFormat that
    // it's time to save off the configuration so no other OutputFormat
    // steps all over it.

new StrongOutputFormat().configure(job.getConfiguration());

    // This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
    // to out set of outputs

MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);

再举一个例子,FileOutputFormat 的 MultipleOutput 子类使用这些属性:

    private static String[] propertyName = {
        "mapred.output.compression.type" ,
        "mapred.output.compression.codec" ,
        "mapred.output.compress" ,
        "mapred.output.dir"
        };

并且将像上面的 ConcurrentColumnFamilyOutputFormat 一样实现,只不过它会使用上述属性。

关于hadoop - 如何在hadoop reducer中编写不同格式的多个输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13574821/

相关文章:

hadoop - 将数据作为 Parquet 从 SQL Server 加载到 S3 - AWS EMR

hadoop - 如何在 MapReduce 中设置执行者编号?

java - 使用 ArrayWritable 的序列化似乎以一种有趣的方式工作

nosql - Cassandra 时间序列数据

java - 如何使用 Cassandra Java 驱动程序访问带有嵌套映射的列表

cassandra - 在 cassandra 中创建自定义分区器以将记录索引到特定节点

python - 使用 yarn 比较器在 MapReduce Python 中进行字数统计排序

hadoop - CustomWritable 对象导致测试用例失败

java - Cassandra setInputSplitSize 无法正常工作

hadoop - Hbase浏览器显示Api错误:TSocket读取0字节的色相