hadoop - "Starting flush of map output"在 hadoop 映射任务中花费很长时间

标签 hadoop map flush

我在一个小文件 (3-4 MB) 上执行 map task ,但 map 输出相对较大 (150 MB)。显示 Map 100% 后,需要很长时间才能完成溢出。请建议我如何减少这段时间。以下是一些示例日志...

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output
13/07/10 17:45:32 INFO mapred.JobClient:  map 98% reduce 0%
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient:  map 100% reduce 0%
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
...............................
...............................
...............................
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22
13/07/10 17:47:52 INFO mapred.JobClient:   File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Written=13401245
13/07/10 17:47:52 INFO mapred.JobClient:   FileSystemCounters
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_READ=18871098
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_READ=7346566
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=35878426
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=18621307
13/07/10 17:47:52 INFO mapred.JobClient:   File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Read=2558288
13/07/10 17:47:52 INFO mapred.JobClient:   Map-Reduce Framework
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input groups=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output materialized bytes=13320006
13/07/10 17:47:52 INFO mapred.JobClient:     Combine output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map input records=71040
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/07/10 17:47:52 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Spilled Records=1480000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output bytes=119998400
13/07/10 17:47:52 INFO mapred.JobClient:     CPU time spent (ms)=0
13/07/10 17:47:52 INFO mapred.JobClient:     Total committed heap usage (bytes)=1178009600
13/07/10 17:47:52 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Combine input records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     Map output records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     SPLIT_RAW_BYTES=122
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input records=740000

map task 源码:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{

    private DoubleWritable distGexpr = new DoubleWritable();
    private LongWritable m2keyOut = new LongWritable();
    int trMax,tstMax;

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {

        Configuration conf =context.getConfiguration();
        tstMax = conf.getInt("mtst", 10);
        trMax = conf.getInt("mtr", 10);

    }

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException {
        String line = values.toString();

        double Tij=0.0,TRij=0.0, dist=0;
        int i=0,j;
        long m2key=0;
        String[] SLl = new String[]{};

        Configuration conf =context.getConfiguration();

        m2key = Long.parseLong(key.toString());
        StringTokenizer tokenizer = new StringTokenizer(line);
        j=0;
        while (tokenizer.hasMoreTokens()) {

            String test = tokenizer.nextToken();
            if(j==0){
                Tij = Double.parseDouble(test);
            }
            else if(j==1){
                TRij = Double.parseDouble(test);
            }
            else if(j==2){
                SLl = StringUtils.split(conf.get(test),",");
            }
            j++;
        }
        //Map input ends

        //Distance Measure function
        dist = (long)Math.pow( (Tij - TRij), 2);

        //remove gid from key 
        m2key = m2key / 100000;
        //Map2 <key,value> emit starts
        for(i=0; i<SLl.length;i++){
               long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key;
            m2keyOut.set(m2keyNew);
            distGexpr.set(dist);
            context.write(m2keyOut,distGexpr);
        }
        //<key,value> emit done
    }

}

示例 map 输入:每行中的最后一个变量从广播变量中获取一个整数数组。每行将产生大约 100-200 条输出记录。

10100014    1356.3238 1181.63 gs-4-56
10100026    3263.1167 3192.4131 gs-3-21
10100043    1852.0 1926.3962 gs-4-76
10100062    1175.5925 983.47125 gs-3-19
10100066    606.59125 976.26625 gs-8-23

示例 map 输出:

10101   8633.0
10102   1822.0
10103   13832.0
10104   2726470.0
10105   1172991.0
10107   239367.0
10109   5410384.0
10111   7698352.0
10112   6.417

最佳答案

我想您已经解决了这个问题(发布原始消息 2 年后),但对于遇到同样问题的任何人,我会尝试提供一些建议。

从您的计数器来看,我知道您已经使用了压缩(因为映射输出物化字节数与映射输出字节数不同),这是一件好事。您可以使用可变长度编码的 VLongWritable 进一步压缩映射器的输出。类,作为映射输出键类型。 (以前也有一个 VDoubleWritable 类,如果我没记错的话,但现在肯定已经弃用了)。

在发出输出的 for 循环中,无需每次都设置 distGexpr 变量。它总是相同的,所以在 for 循环之前设置它。您还可以在循环外存储一个长整数和 trMax*tstMax 的乘积,而不是在每次迭代时计算它。

如果可能,将您的输入键设为 LongWritable(来自上一个作业),以便您可以保存 Long.parseLong()Text.toString()调用。

如果可能(取决于你的 reducer ),使用组合器来减少溢出字节的大小。

我找不到一种方法来跳过 for 循环中的 Integer.parseInt() 调用,但是如果您最初可以将 SLl 加载为int[].

关于hadoop - "Starting flush of map output"在 hadoop 映射任务中花费很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17567257/

相关文章:

hadoop - Apache-Hadoop-Common 编译失败 : Error running javah command

Hadoop FileInputFormat isSplitable false

php - 获取 map 的所有键,例如 $_POST

c - 如何在程序崩溃前检测文本文件是否损坏?

python - 使用MRJob将作业提交到EMR群集

hadoop - 无法解析 Apache Pig 中的 Over()

map - 获取结构元素的静态类型

java - Hibernate Session.flush()效率问题

batch-file - 从远程批处理文件刷新 Redis 数据库