java - Mapreduce Mapper说明

标签 java hadoop mapreduce

Hadoop明确指南中有一个NCDC天气数据集示例。
Mapper类代码如下

Example 2-3. Mapper for maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
 extends Mapper<LongWritable, Text, Text, IntWritable> {
 private static final int MISSING = 9999;

 @Override
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {

 String line = value.toString();
 String year = line.substring(15, 19);
 int airTemperature;
 if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
 airTemperature = Integer.parseInt(line.substring(88, 92));
 } else {
 airTemperature = Integer.parseInt(line.substring(87, 92));
 }
 String quality = line.substring(92, 93);
 if (airTemperature != MISSING && quality.matches("[01459]")) {
 context.write(new Text(year), new IntWritable(airTemperature));
 }
 }
}

驱动程序代码为:
Example 2-5. Application to find the maximum temperature in the weather dataset
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
 public static void main(String[] args) throws Exception {
 if (args.length != 2) {
 System.err.println("Usage: MaxTemperature <input path> <output path>");
 System.exit(-1);
 }

 Job job = new Job();
 job.setJarByClass(MaxTemperature.class);
 job.setJobName("Max temperature");
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));

 job.setMapperClass(MaxTemperatureMapper.class);
 job.setReducerClass(MaxTemperatureReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

我无法理解,因为我们传递了包含多行的文件,为什么行上没有迭代。该代码似乎在一行上处理。

最佳答案

这本书解释了Mapper<LongWritable, Text,的含义...关键是文件的偏移量。该值是一行。

它还提到TextInputFormat是默认的mapreduce输入类型,它是FileInputFormat的类型。

public class TextInputFormat
extends FileInputFormat<LongWritable,Text>

因此,默认输入类型必须为长文本对

正如JavaDoc所说

Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Keys are the position in the file, and values are the line of text..



本书还包含有关定义自定义RecordReaders的部分

您需要调用job.setInputFormatClass将其更改为读取除单行以外的任何内容

关于java - Mapreduce Mapper说明,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49446058/

相关文章:

hadoop - 映射器将值传递给不同的映射器-缩减器

java - 使用函数引用在 Kotlin 中重写 Java 代码发生 SAM 类型冲突

java - HikariCP 自动提交用法与常规 java 连接自动提交用法相同吗?

scala - Spark utf 8错误,非英文数据变成 `??????????`

java - 如何从 Java 中停止 Hadoop 作业

json - 错误1066:无法打开别名的迭代器-PIG SCRIPT

java - 我的 MapReduce 程序出现错误,我想收集多年来的最高温度

sorting - mapreduce 分区内的数据是否已排序,如果是,它是如何发生的?

java - 收到错误 "org.springframework.beans.factory.BeanCreationException: Error creating bean with name ' homeController'"

java - java.util.concurrent.Future 的 scala.concurrent.Future 包装器