java - 使用Map Reduce连接多个文件

标签 java hadoop mapreduce

加入2个文件的内容:

第一个文件(包含员工姓名数据)

id,name
101,Gaurav
102,Rohit
103,Karishma
104,Darshan
105,Divya

第二个文件(包含员工部门数据)
id,dept
101,Sales
102,Research
103,NMG
104,Admin
105,HR

=========================

输出
id,name,dept
101,Gaurav,Sales
102,Rohit,Research
103,Karishma,NMG

我如何实现这种输出?

截至目前,我正在减少作为 reducer 中的随机值。

我想要按ID,名称,部门等指定顺序进行输出。
任何帮助表示赞赏。

Mapper类看起来像这样...
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text keyEmit = new Text();
private Text valEmit = new Text();
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
 String line=value.toString();
 String[] words=line.split(",");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}
}

reducer 类看起来像这样...
public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    merge = key.toString(); // 101
    for(Text value : values) {
        merge +=  "," + value.toString();
    }
    context.write(NullWritable.get(), new Text(merge));
}
}

驱动程序类看起来像这样...
public class JoinDriver {
public final static void main(final String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Multiple join");

    job.setJarByClass(JoinDriver.class);
    // job.setMapperClass(JoinMapper.class);
    job.setReducerClass(JoinReducer.class);

    MultipleInputs.addInputPath(job, new Path(args[0]),
            TextInputFormat.class, JoinMapper.class);

    MultipleInputs.addInputPath(job, new Path(args[1]),
            TextInputFormat.class, JoinMapper.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

到目前为止,输出如下,我希望按其本身顺序输入ID,名称,部门。

output as of now

最佳答案

您遇到的主要问题是值没有排序,因此您要对一个公用键进行分组,但是仅将值作为字符串发送并不会带来很大的帮助,因为您不知道哪个是名称,哪个是部门。

您有一些选择,所有这些都需要从映射器发送更多信息:

  • 使用辅助排序
  • 对化简器
  • 中的值进行排序

    最快的方法是在映射器中输出该值时将一些更多信息附加到该值上(理想情况下,您实际上将使用包含两个Text对象的复合值)。
    public class JoinMapperName extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable k, Text value, Context context) 
                              throws IOException, InterruptedException {
    
            String[] words = value.toString().split(",");
            context.write(new Text(words[0]), new Text("name:" + words[1]));
        }
    }
    
    public class JoinMapperDept extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable k, Text value, Context context) 
                              throws IOException, InterruptedException {
    
            String[] words = value.toString().split(",");
            context.write(new Text(words[0]), new Text("dept:" + words[1]));
        }
    }
    

    因此,现在每个数据源都有一个不同的映射器。并且您需要将 reducer 更改为以下内容:
    public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) 
                  throws IOException, InterruptedException {
    
            String name = "";
            String dept = "";
            for(Text value : values) {
                if (value.toString().startsWith("name")) {
                    name = value.toString().split(":")[1];
                } else {
                    dept = value.toString().split(":")[1];
                }
            }
            String merge = key + "," name + "," + dept;
            context.write(NullWritable.get(), new Text(merge));
        }
    }
    

    这只是有关如何实现的简单示例。希望它能给您一些有关如何执行订购的想法。

    关于java - 使用Map Reduce连接多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46313945/

    相关文章:

    java - Apache2 代理 WebSocket 连接?

    hadoop - 在不创建 _temporary 文件夹的情况下将 Spark 数据帧作为 Parquet 写入 S3

    java - 获取 hadoop 中已完成和退役作业的详细信息

    hadoop - 在hadoop中是否可以有一个场景,其中只有1个map任务和0个reduce任务?

    java - 在 Map Reduce 作业中使用多线程

    java - 编写 Java 文档 - 显示输出示例

    java - 在 Java 中创建动态 3d 数组

    java - 使用JAVA套接字发送多个文件

    hadoop - 在 hadoop 中摄取数据后的标准流程

    python - MapReduce 批量更新数据存储