java - 看不到 map task 中的照片

标签 java hadoop mapreduce

我已经设置了mapred-site.xml [1]和log4j.properties [2]以显示调试日志文件,并且设置了我的 map 类[3] System.out.printlns,但是我看不到它被打印了。我已经查看了hadoop的日志或任务的日志,但是从打印的 map task 中我什么都看不到。我不明白为什么要得到这个。有什么帮助吗?

[1] mapred-site.xml

    ~/Programs/hadoop$ cat etc/hadoop/mapred-site.xml
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    (...)
     <property> <name>mapreduce.map.log.level</name> <value>DEBUG</value> </property>
    </configuration>

[2] log4j.properties
    hadoop.root.logger=DEBUG,console
    hadoop.log.dir=.
    hadoop.log.file=hadoop.log

    # Define the root logger to the system property "hadoop.root.logger".
    log4j.rootLogger=${hadoop.root.logger}, EventCounter

    # Logging Threshold
    log4j.threshold=ALL

    # Null Appender
    log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

    #
    # Rolling File Appender - cap space usage at 5gb.
    #
    hadoop.log.maxfilesize=256MB
    hadoop.log.maxbackupindex=20
    log4j.appender.RFA=org.apache.log4j.RollingFileAppender
    log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

    log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
    log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

    log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

    # Pattern format: Date LogLevel LoggerName LogMessage
    log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    # Debugging Pattern format
    #log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


    #
    # Daily Rolling File Appender
    #

    log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

    # Rollver at midnight
    log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

    # 30-day backup
    #log4j.appender.DRFA.MaxBackupIndex=30
    log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

    # Pattern format: Date LogLevel LoggerName LogMessage
    log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    # Debugging Pattern format
    #log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


    #
    # console
    # Add "console" to rootlogger above if you want to use this 
    #

    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

    #
    # TaskLog Appender
    #

    #Default values
    hadoop.tasklog.taskid=null
    hadoop.tasklog.iscleanup=false
    hadoop.tasklog.noKeepSplits=4
    hadoop.tasklog.totalLogFileSize=100
    hadoop.tasklog.purgeLogSplits=true
    hadoop.tasklog.logsRetainHours=12

    log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
    log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
    log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
    log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}

    log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
    log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

    #
    # HDFS block state change log from block manager
    #
    # Uncomment the following to suppress normal block state change
    # messages from BlockManager in NameNode.
    #log4j.logger.BlockStateChange=WARN

    #
    #Security appender
    #
    hadoop.security.logger=INFO,NullAppender
    hadoop.security.log.maxfilesize=256MB
    hadoop.security.log.maxbackupindex=20
    log4j.category.SecurityLogger=${hadoop.security.logger}
    hadoop.security.log.file=SecurityAuth-${user.name}.audit
    log4j.appender.RFAS=org.apache.log4j.RollingFileAppender 
    log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
    log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
    log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
    log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}

    #
    # Daily Rolling Security appender
    #
    log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender 
    log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
    log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
    log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd

    #
    # hadoop configuration logging
    #

    # Uncomment the following line to turn off configuration deprecation warnings.
    # log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

    #
    # hdfs audit logging
    #
    hdfs.audit.logger=INFO,NullAppender
    hdfs.audit.log.maxfilesize=256MB
    hdfs.audit.log.maxbackupindex=20
    log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
    log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
    log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
    log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
    log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
    log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

    #
    # mapred audit logging
    #
    mapred.audit.logger=INFO,NullAppender
    mapred.audit.log.maxfilesize=256MB
    mapred.audit.log.maxbackupindex=20
    log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
    log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
    log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
    log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
    log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
    log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
    log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}

    # Custom Logging levels

    log4j.logger.org.apache.hadoop=DEBUG

    # Jets3t library
    log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR

    # AWS SDK & S3A FileSystem
    log4j.logger.com.amazonaws=ERROR
    log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
    log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN

    #
    # Event Counter Appender
    # Sends counts of logging messages at different severity levels to Hadoop Metrics.
    #
    log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter

    #
    # Job Summary Appender 
    #
    # Use following logger to send summary to separate file defined by 
    # hadoop.mapreduce.jobsummary.log.file :
    # hadoop.mapreduce.jobsummary.logger=DEBUG,JSA
    # 
    hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
    hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
    hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
    hadoop.mapreduce.jobsummary.log.maxbackupindex=20
    log4j.appender.JSA=org.apache.log4j.RollingFileAppender
    log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
    log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
    log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
    log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
    log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
    log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
    log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false


    log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
    log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
    log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
    log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
    log4j.appender.RMSUMMARY.MaxFileSize=256MB
    log4j.appender.RMSUMMARY.MaxBackupIndex=20
    log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
    log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

[3]我的 map 类(class):
    static abstract class SampleMapBase
            extends Mapper<Text, Text, Text, Text>{
        private long total;
        private long kept = 0;
        private float keep;

        protected void setKeep(float keep) {
            this.keep = keep;
        }

        protected void emit(Text key, Text val, OutputCollector<Text,Text> out)
                throws IOException {
            System.out.println("Key: " + key.toString() + " Value: " + val.toString());
            System.out.println("Key: " + key.getClass() + " Value: " + val.getClass());
            ++total;
            while((float) kept / total < keep) {
                ++kept;
                out.collect(key, val);
            }
        }
    }


    public static class MySampleMapper extends SampleMapBase{
        String inputFile;

        public void setup(Context context) {
            this.inputFile = ((FileSplit) context.getInputSplit()).getPath().toString();
            setKeep(context.getConfiguration().getFloat("hadoop.sort.map.keep.percent", (float) 100.0) / (float) 100.0);
        }

        public void map(Text key, Text val,
                        OutputCollector<Text,Text> output, Reporter reporter)
                throws IOException {
            emit(key, val, output);
            System.out.println("EMIT");
        }

    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: webdatascan -Dfile.path=<path> [<in>...] <out>");
            System.exit(2);
        }

        System.setProperty("file.path", parser.getConfiguration().get("file.path"));
        List<String> remoteHosts = new ArrayList<String>();// directory where the map output is remotely
        remoteHosts.add(Inet4Address.getLocalHost().getHostAddress());

        // first map tasks
        JobConf conf = MyWebDataScan.addJob(1, true, true);

        Job job = new Job(conf, conf.getJobName());
        job.setJarByClass(MyWebDataScan.class);
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
        job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);

        job.setMapperClass(MySampleMapper.class);
        job.setReducerClass(MySampleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);
        job.getConfiguration().set("mapred.output.dir", outputPath.toString());

        JobClient.runJob(job);
    }

最佳答案

您已经将旧的API和新的API混合在一起。您的输入和输出格式类使用新API中的类,但是您的map函数是以旧API的形式指定的。如果您已导入org.apache.hadoop.mapreduce.mapper,则map函数应具有如下规范

private static class RecordMapper
extends Mapper<LongWritable, Text, Text, LongWritable> {
    public void map(LongWritable lineOffset, Text record, Context output)
    throws IOException, InterruptedException {
        output.write(new Text("Count"), new LongWritable(1));
    }

}

当您使用这种 map 功能时, map 功能将永远不会被调用。相反,它将使用此video中说明的默认 map 功能
public void map(Text key, 
                Text val, 
                OutputCollector<Text,Text> output, 
                Reporter reporter) 
    throws IOException

关于java - 看不到 map task 中的照片,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34644026/

相关文章:

oracle - 将数据从 Oracle 提取到 Hadoop。 Sqoop 是个好主意吗

hadoop - 如何调试挂起的 hadoop map-reduce 作业

hadoop - 将参数传递给配置单元查询

java - 并发模式失败,而 Full GC

java - 如何在 Mac OS X 10.7.5 上恢复到 Java 1.6

java - 如何在具有缓存接口(interface)的简单缓存实现中使用泛型?

java - 与 Java : How to import an arbitrary . csv 文件中的文件实例函数混淆到 mysql 而不是特定文件?

hadoop - 是否可以更新已经写入S3的数据?

java - Hive 中的自定义 UserDefinedFunction

hadoop - 可以同时运行多少个 Mapreduce 作业