java - 生成文件并在map函数中写入

标签 java hadoop

我正在我的 map 函数中生成一个 csv 文件。这样每个map任务都会生成一个csv文件。现在这是一个副作用,而不是映射器的输出。我命名这些文件的方式类似于 filename_inputkey。但是,当我在单节点集群上运行该应用程序时,仅生成一个文件。我的输入中有 10 行,根据我的理解,将有 10 个映射器任务并生成 10 个文件。如果我在这里思考的方式错误,请告诉我。

这是我的 GWASInputFormat 类

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class GWASInputFormat extends FileInputFormat<LongWritable, GWASGenotypeBean>{

@Override
public RecordReader<LongWritable, GWASGenotypeBean> getRecordReader(org.apache.hadoop.mapred.InputSplit input, JobConf job, Reporter arg2) throws IOException {
    return (RecordReader<LongWritable, GWASGenotypeBean>) new GWASRecordReader(job, (FileSplit)input);
}

}

这是 GWASRecordReader

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class GWASRecordReader implements RecordReader<LongWritable, GWASGenotypeBean>{

private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

@Override
public void close() throws IOException {
    if(lineReader != null) {
        lineReader.close();
    }
}

public GWASRecordReader(JobConf job, FileSplit split) throws IOException {
    lineReader = new LineRecordReader(job, split);
    lineKey = lineReader.createKey();
    lineValue = lineReader.createValue();
}

@Override
public LongWritable createKey() {
    return new LongWritable();
}

@Override
public GWASGenotypeBean createValue() {
    return new GWASGenotypeBean();
}

@Override
public long getPos() throws IOException {
    return lineReader.getPos();
}

@Override
public boolean next(LongWritable key, GWASGenotypeBean value) throws IOException {
    if(!lineReader.next(lineKey, lineValue)){
        return false;
    }

    String[] values = lineValue.toString().split(",");

    if(values.length !=32) {
        throw new IOException("Invalid Record ");
    }

    value.setPROJECT_NAME(values[0]);
    value.setRESEARCH_CODE(values[1]);
    value.setFACILITY_CODE(values[2]);
    value.setPROJECT_CODE(values[3]);
    value.setINVESTIGATOR(values[4]);
    value.setPATIENT_NUMBER(values[5]);
    value.setSAMPLE_COLLECTION_DATE(values[6]);
    value.setGENE_NAME(values[7]);
    value.setDbSNP_RefSNP_ID(values[8]);
    value.setSNP_ID(values[9]);
    value.setALT_SNP_ID(values[10]);
    value.setSTRAND(values[11]);
    value.setASSAY_PLATFORM(values[12]);
    value.setSOFTWARE_NAME(values[13]);
    value.setSOFTWARE_VERSION_NUMBER(values[14]);
    value.setTEST_DATE(values[15]);
    value.setPLATE_POSITION(values[16]);
    value.setPLATE_ID(values[17]);
    value.setOPERATOR(values[18]);
    value.setGENOTYPE(values[19]);
    value.setGENOTYPE_QS1_NAME(values[20]);
    value.setGENOTYPE_QS2_NAME(values[21]);
    value.setGENOTYPE_QS3_NAME(values[22]);
    value.setGENOTYPE_QS4_NAME(values[23]);
    value.setGENOTYPE_QS5_NAME(values[24]);
    value.setGENOTYPE_QS1_RESULT(values[25]);
    value.setGENOTYPE_QS2_RESULT(values[26]);
    value.setGENOTYPE_QS3_RESULT(values[27]);
    value.setGENOTYPE_QS4_RESULT(values[28]);
    value.setGENOTYPE_QS5_RESULT(values[29]);
    value.setSTAGE(values[30]);
    value.setLAB(values[31]);
    return true;
}

@Override
public float getProgress() throws IOException {
    return lineReader.getProgress();
}

}

映射器类

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import com.google.common.base.Strings;

public class GWASMapper extends MapReduceBase implements Mapper<LongWritable, GWASGenotypeBean, Text, Text> {

private static Configuration conf;

@SuppressWarnings("rawtypes")
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException {
    conf = context.getConfiguration();
    // Path[] otherFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
}


@Override
public void map(LongWritable inputKey, GWASGenotypeBean inputValue, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {



    checkForNulls(inputValue, inputKey.toString());




    output.collect(new Text(inputValue.getPROJECT_CODE()), new Text(inputValue.getFACILITY_CODE()));

}

private void checkForNulls(GWASGenotypeBean user, String inputKey) {

    String f1 = " does not have a value_fail";
    String p1 = "Must not contain NULLS for required fields";
    // have to initialize these two to some paths in hdfs

    String edtChkRptDtl = "/user/hduser/output6/detail" + inputKey + ".csv";
    String edtChkRptSmry = "/user/hduser/output6/summary" + inputKey + ".csv";
            ../

            List<String> errSmry = new ArrayList<String>();
    Map<String, String> loc = new TreeMap<String, String>();

    if(Strings.isNullOrEmpty(user.getPROJECT_NAME())) {
        loc.put("test", "PROJECT_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getRESEARCH_CODE())) {
        loc.put("test", "RESEARCH_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getFACILITY_CODE())) {
        loc.put("test", "FACILITY_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPROJECT_CODE())) {
        loc.put("test", "PROJECT_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getINVESTIGATOR())) {
        loc.put("test", "INVESTIGATOR ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPATIENT_NUMBER())) {
        loc.put("test", "PATIENT_NUMBER ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSAMPLE_COLLECTION_DATE())) {
        loc.put("test", "SAMPLE_COLLECTION_DATE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getGENE_NAME())) {
        loc.put("test", "GENE_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSTRAND())) {
        loc.put("test", "STRAND ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getASSAY_PLATFORM())) {
        loc.put("test", "ASSAY_PLATFORM ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSOFTWARE_NAME())) {
        loc.put("test", "SOFTWARE_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getTEST_DATE())) {
        loc.put("test", "TEST_DATE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPLATE_POSITION())) {
        loc.put("test", "PLATE_POSITION ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPLATE_ID())) {
        loc.put("test", "PLATE_ID ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getOPERATOR())) {
        loc.put("test", "OPERATOR ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getGENOTYPE())) {
        loc.put("test", "GENOTYPE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSTAGE())) {
        loc.put("test", "STAGE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getLAB())) {
        loc.put("test", "LAB ");
        errSmry.add("_fail");
    }

    String customNullMsg = "Required Genotype column(s)";
    List<String> error = new ArrayList<String>();
    String message = null;

    if(!loc.isEmpty()) {
        for (Map.Entry<String, String> entry : loc.entrySet()) {
        message = "line:" + entry.getKey() + " column:" + entry.getValue() + " " + f1;
        error.add(message);
        }
    } else {
        message = "_pass";
        error.add(message);
    }

    int cnt = 0;
    if(!errSmry.isEmpty()) {

        // not able to understand this. Are we trying to get the occurances
        // if the last key that contains _fail
        for (String key : errSmry) {
        if(key.contains("_fail")) {
            cnt = Collections.frequency(errSmry, key);
            // ******************** Nikhil added this
            break;
        }
        }

        if(cnt > 0) {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "failed", Integer.toString(cnt));
        } else {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
        }

    } else {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
    }

    // loop the list and write out items to the error report file
    if(!error.isEmpty()) {
        for (String s : error) {
        //System.out.println(s);
        if(s.contains("_fail")) {
            String updatedFailmsg = s.replace("_fail", "");
            writeCsvFileDtl(edtChkRptDtl, "genotype", updatedFailmsg, "failed");
        }
        if(s.contains("_pass")) {
            writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
        }
        }
    } else {
        writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
    }
    // end loop
   }

 private void writeCsvFileDtl(String edtChkRptDtl, String col1, String col2, String col3) {
    try {
        if(conf == null) {
            conf = new Configuration();
        }
        FileSystem fs = FileSystem.get(conf);

        Path path = new Path(edtChkRptDtl);
        if (!fs.exists(path)) {
            FSDataOutputStream out = fs.create(path);
            out.writeChars(col1);
            out.writeChar(',');
            out.writeChars(col2);
            out.writeChar(',');
            out.writeChars(col3);
            out.writeChar('\n');
            out.flush();
            out.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void writeCsvFileSmry(String edtChkRptSmry, String col1, String col2, String col3, String col4) {
    try {


        if(conf == null) {
            conf = new Configuration();
        }
        FileSystem fs = FileSystem.get(conf);

        Path path = new Path(edtChkRptSmry);
        if (!fs.exists(path)) {
            FSDataOutputStream out = fs.create(path);
            out.writeChars(col1);
            out.writeChar(',');
            out.writeChars(col2);
            out.writeChar(',');
            out.writeChars(col3);
            out.writeChar(',');
            out.writeChars(col4);
            out.writeChar('\n');
            out.flush();
            out.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
}

这是我的驱动程序类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GWASMapReduce extends Configured implements Tool{

/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    ToolRunner.run(configuration, new GWASMapReduce(), args);
}

@Override
public int run(String[] arg0) throws Exception {

    JobConf conf = new JobConf(new Configuration());
    conf.setInputFormat(GWASInputFormat.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setJarByClass(GWASMapReduce.class);
    conf.setMapperClass(GWASMapper.class);
    conf.setNumReduceTasks(0);
    FileInputFormat.addInputPath(conf, new Path(arg0[0]));
    FileOutputFormat.setOutputPath(conf, new Path(arg0[1]));
    JobClient.runJob(conf);
    return 0;
}
}

最佳答案

可能只有一个 Mapper 任务,以及对其 map 方法的十次调用。如果您希望为每个映射器写出一个文件,则应该在其 configure 方法中执行此操作。如果您希望为每个输入记录写出一个文件,则应该在其 map 方法中这样做。

编辑:以上内容与问题无关。问题在于,在 GWASInputFormat 中,您没有在 next 方法中设置键,因此您的 map 输入键始终相同。只需将 key.set(lineKey.get()); 添加到 next 方法中,它就应该可以工作。

关于java - 生成文件并在map函数中写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13241284/

相关文章:

java - 如何检查某个用户在根节点上是否具有{写、读}权限

java - 使用自定义比较器对 ArrayList<String> 进行排序

java - 通过另一个代理从一个代理连接到互联网

java - 检查字符串中的唯一字符(java)

hadoop - 我在 hive 中有n(大量)个小型txt文件

apache - OpenTSDB:动物园管理员错误

java - 单击按钮时我想显示一条 Toast 消息

hadoop - 在配置单元中处理换行符

java - 减少侧连接中的布隆过滤器

hadoop - Hbase-hadoop集成中datanode、regionserver的作用