java - MapReduce 作业挂起

标签 java hadoop mapreduce

我是 Hadoop 的 MapReduce 新手。我写了一个 map reduce 任务,我想在我的本地机器上运行它。但工作在 map 100% 后挂起。

下面是代码,我不明白我错过了什么。

我有一个自定义键类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class AirlineMonthKey implements WritableComparable<AirlineMonthKey>{

Text airlineName;
Text month;

public AirlineMonthKey(){
    super();
}

public AirlineMonthKey(Text airlineName, Text month) {
    super();
    this.airlineName = airlineName;
    this.month = month;
}

public Text getAirlineName() {
    return airlineName;
}

public void setAirlineName(Text airlineName) {
    this.airlineName = airlineName;
}

public Text getMonth() {
    return month;
}

public void setMonth(Text month) {
    this.month = month;
}

@Override
public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    this.airlineName.readFields(in);
    this.month.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    this.airlineName.write(out);
    this.month.write(out);      
}

@Override
public int compareTo(AirlineMonthKey airlineMonthKey) {
    // TODO Auto-generated method stub
    int diff = getAirlineName().compareTo(airlineMonthKey.getAirlineName());
    if(diff != 0){
        return diff;
    }

    int m1 = Integer.parseInt(getMonth().toString());
    int m2 = Integer.parseInt(airlineMonthKey.getMonth().toString());

    if(m1>m2){
        return -1;
    }
    else 
        return 1;
}


}

以及使用自定义键的映射器和缩减器类,如下所示。

package com.mapresuce.secondarysort;

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.opencsv.CSVReader;

public class FlightDelayByMonth {

public static class FlightDelayByMonthMapper extends
        Mapper<Object, Text, AirlineMonthKey, Text> {
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String str = value.toString();
        // Reading Line one by one from the input CSV.
        CSVReader reader = new CSVReader(new StringReader(str));
        String[] split = reader.readNext();
        reader.close();

        String airlineName = split[6];
        String month = split[2];
        String year = split[0];
        String delayMinutes = split[37];
        String cancelled = split[41];

        if (!(airlineName.equals("") || month.equals("") || delayMinutes
                .equals(""))) {
            if (year.equals("2008") && cancelled.equals("0.00")) {
                AirlineMonthKey airlineMonthKey = new AirlineMonthKey(
                        new Text(airlineName), new Text(month));
                Text delay = new Text(delayMinutes);
                context.write(airlineMonthKey, delay);
                System.out.println("1");
            }
        }

    }
}

public static class FlightDelayByMonthReducer extends
        Reducer<AirlineMonthKey, Text, Text, Text> {


    public void reduce(AirlineMonthKey key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for(Text val : values){
            context.write(new Text(key.getAirlineName().toString()+" "+key.getMonth().toString()), val);
        }
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {   
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage:<in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "Average monthly flight dealy");
    job.setJarByClass(FlightDelayByMonth.class);
    job.setMapperClass(FlightDelayByMonthMapper.class);
    job.setReducerClass(FlightDelayByMonthReducer.class);
    job.setOutputKeyClass(AirlineMonthKey.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

此外,我还在主目录中创建了一个作业和配置。不知道我错过了什么。我在本地环境中运行所有这些。

最佳答案

尝试在您的 AirlineMonthKey 类中编写 toString、equals 和 hashcode 的自定义实现。

阅读下面的链接。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/WritableComparable.html

关键类型实现 hashCode() 很重要。

希望对您有所帮助。

关于java - MapReduce 作业挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29053688/

相关文章:

Java导入外部jar文件

java - 如何更改 AdListener 的重写方法内的变量值?

sql - 在配置单元中使用 wm_concat 时,如何用 0 填充不存在的值?

hadoop - 典型的 Hadoop 架构和 MapR 架构之间的区别

java - Cloudera HBase 连接超时异常

hadoop - 为什么 DISTINCT 在 Pig 中比 GROUP BY/FOREACH 快

java - 在输出中的每个数字之间实现连字符

java - 遍历异构容器

mapreduce - Couchbase 延迟和 View

java - 将 CSV 转换为 ORC 时出现异常