hadoop - 在Hadoop MapReduce中从映射器生成多个输出文件

标签 hadoop mapreduce chain multipleoutputs mappers

我正在从mapper生成两个输出文件。我正在使用Multipleoutput api来生成两个输出。我不确定这是否是正确的方法。这是我的代码。
请仔细阅读一下,并给我您的建议。.当我运行代码时,我收到错误:java.lang.NullPointerException ..

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Export_Column_Mapping 
{
    public static void main(String[] args) throws Exception 
    {
        String Output_filetype = args[2];
        String Input_column_number = args[3];
        String Output_column_number = args[4];


        Configuration conf = new Configuration();

        conf.setStrings("output_filetype",Output_filetype);
        conf.setStrings("Input_column_number",Input_column_number);
        conf.setStrings("Output_column_number",Output_column_number);

        Job job = new Job(conf, "Export_Column_Mapping");
        job.setJarByClass(Export_Column_Mapping.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setMapperClass(Export_Column_Mapping_Mapper.class);
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


    public static class Export_Column_Mapping_Mapper extends Mapper<LongWritable,Text,Text,Text>
    {   
        private MultipleOutputs<Text, LongWritable> mos ;

        public void setup(TaskInputOutputContext<?, ?, Text, LongWritable> context) {
            mos = new MultipleOutputs<Text, LongWritable>(context);
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String str_Output_filetype = context.getConfiguration().get("output_filetype"); 

            String str_Input_column_number = context.getConfiguration().get("Input_column_number");
            String[] input_columns_number = str_Input_column_number.split(",");

            String str_Output_column_number= context.getConfiguration().get("Output_column_number");    
            String[] output_columns_number = str_Output_column_number.split(",");

            String str_line = value.toString();
            String[] input_column_array = str_line.split(",");
            String[] Detail_output_column_array = new String[27];
            String[] Shop_output_column_array = new String[8];
            String details_output = null ;
            String Shop_output = null;
        try
        {

            for(int i = 0;i<=input_column_array.length+1; i++)
            {
                int int_outputcolumn = Integer.parseInt(output_columns_number[i]);
                int int_inputcolumn = Integer.parseInt(input_columns_number[i]);

                if((int_inputcolumn != 0) && (int_outputcolumn != 0) && output_columns_number.length == input_columns_number.length){

                    Detail_output_column_array[int_outputcolumn-1] = input_column_array[int_inputcolumn-1];
                    Shop_output_column_array[0] = Detail_output_column_array[0];
                    Shop_output_column_array[1] = Detail_output_column_array[1];
                    Shop_output_column_array[2] = Detail_output_column_array[2];
                    Shop_output_column_array[3] = Detail_output_column_array[3];
                    Shop_output_column_array[4] = Detail_output_column_array[14];


                    if(details_output != null)
                    {
                        details_output = details_output+"       "+ Detail_output_column_array[int_outputcolumn-1];
                        Shop_output = Shop_output+"     "+ Shop_output_column_array[int_outputcolumn-1];

                    }else
                    {
                        details_output = Detail_output_column_array[int_outputcolumn-1];
                        Shop_output =  Shop_output_column_array[int_outputcolumn-1];

                    }
                }
            }

        }catch (Exception e)
        {

        }
            mos.write("Details File", null, details_output);
            mos.write("Shop File", null, Shop_output);

    }
}
}

这是日志。

Error: java.lang.NullPointerException at com.nielsen.grfe.Export_Column_Mapping$Export_Column_Mapping_Mapper.map(Export_Column_Mapping.java:113) at com.nielsen.grfe.Export_Column_Mapping$Export_Column_Mapping_Mapper.map(Export_Column_Mapping.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

最佳答案

您的MultipleOutputs的定义如下:

private MultipleOutputs<Text, LongWritable> mos ;

在编写以下代码时:
mos.write("Details File", null, details_output);
mos.write("Shop File", null, Shop_output);

应该像
mos.write(your key in definition, your value in definition, output directory name as string)

如下更改您的定义,书写和相应的内容:
private MultipleOutputs<NullWritable, Text> mos ;
mos.write(NullWritable.get(),new Text(details_output),"Details File");
mos.write(NullWritable.get(),new Text(Shop_output),"Shop File");
 public static class Export_Column_Mapping_Mapper extends Mapper<LongWritable,Text,NullWritable,Text>
{   
job.setOutputKeyClass(NullWritable.class);    

还包括清理代码:
protected void cleanup(Context context)
throws IOException, InterruptedException {
multipleOutputs.close();
}

关于hadoop - 在Hadoop MapReduce中从映射器生成多个输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33097160/

相关文章:

hadoop - Storm框架应用

hadoop - import org.apache.hadoop.mapreduce 无法解析

python - celery 弦不等待子任务(一组链)

要打开的 Hadoop 3.1.0 端口

hadoop - 在 hive 中创建表格时位置的用途是什么

mongodb - 在 MongoDB mapreduce 中,如何展平值对象?

hadoop - Apache Pig 本地处理 bz2 文件?

hadoop - 可以使用 Map Reduce 和 Hadoop 并行处理批处理作业吗?

Javascript - 无法理解闭包

filter - 使用 GPUImage 框架在过滤器链之间切换