我是 Hadoop 新手。我想运行一个 MapReduce 示例并使用计算器映射器查看其结果。也就是我想知道,每个中间结果是由哪个mapper计算出来的?是否可以?如何?
我安装了 Hadoop 2.9.0(多节点集群)。
最佳答案
首先我们看一下示例代码(我已经安装了 HDP 集群,所以 .jar 文件的路径可能不同)
示例文本文件作为输入:
$ bin/hadoop dfs -ls /wordcount/input/
/wordcount/input/file01
/wordcount/输入/file02
$ bin/hadoop dfs -cat /wordcount/input/file01
Hello World 再见世界
$ bin/hadoop dfs -cat /wordcount/input/file02
你好 Hadoop 再见 Hadoop
运行应用程序:
$ bin/hadoop jar /usr/hdp/2.6x.x/hadoop-mapreduce/hadoo-mapreduce-examples.jar wordcount /wordcount/input /wordcount/output
注意:您不需要编写字数统计程序,正如我所提到的,它在 mapreduce 文件夹中默认提供。下面给出的代码仅供引用工作
输出:
$ bin/hadoop dfs -cat /wordcount/output/part-00000
再见 1
再见 1
Hadoop 2
你好 2
世界2
现在,让我们看看 mapper 和 reducer 在后端是如何工作的:
WordCount 应用程序非常简单。
映射器 实现 (第 14-26 行) ,通过 map 方法(第 18-25 行) , 一次处理一行,由指定的 提供文本输入格式(第 49 行) .然后,它通过 StringTokenizer 将行拆分为由空格分隔的标记,并发出 < , 1> 的键值对。
对于给定的样本输入 第一张 map 发出:
<你好,1>
<世界,1>
<再见,1>
<世界,1>
第二张 map 发出:
<你好,1>
< Hadoop,1>
<再见,1>
< Hadoop,1>
我们将在本教程的稍后部分详细了解为给定作业生成的 map 数量,以及如何以细粒度方式控制它们。
WordCount 还指定了 组合器(第 46 行 )。因此,每个 map 的输出在对键进行排序后,通过本地组合器(根据作业配置与 Reducer 相同)进行本地聚合。
的输出第一张 map :
<再见,1>
<你好,1>
<世界,2>
的输出第二张 map :
<再见,1>
< Hadoop,2>
<你好,1>
reducer 实现 (第 28-36 行) ,通过 reduce 方法(第 29-35 行)只是对值求和,这些值是每个键的出现计数(即本例中的单词)。
因此 作业的输出是 :
<再见,1>
<再见,1>
< Hadoop,2>
<你好,2>
<世界,2>
run 方法在 JobConf 中指定作业的各个方面,例如输入/输出路径(通过命令行传递)、键/值类型、输入/输出格式等。然后它调用 JobClient.runJob(第 55 行)提交并监控其进度。
现在,这里提到的字数统计程序是:
1. package org.myorg;
2.
3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
11.
12. public class WordCount {
13.
14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15. private final static IntWritable one = new IntWritable(1);
16. private Text word = new Text();
17.
18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19. String line = value.toString();
20. StringTokenizer tokenizer = new StringTokenizer(line);
21. while (tokenizer.hasMoreTokens()) {
22. word.set(tokenizer.nextToken());
23. output.collect(word, one);
24. }
25. }
26. }
27.
28. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30. int sum = 0;
31. while (values.hasNext()) {
32. sum += values.next().get();
33. }
34. output.collect(key, new IntWritable(sum));
35. }
36. }
37.
38. public static void main(String[] args) throws Exception {
39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
44.
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
48.
49. conf.setInputFormat(TextInputFormat.class);
50. conf.setOutputFormat(TextOutputFormat.class);
51.
52. FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55. JobClient.runJob(conf);
57. }
58. }
59.
引用:MapReduce Tutorial
关于Hadoop:哪个映射器返回了哪个结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50444076/