我在一个小文件 (3-4 MB) 上执行 map task ,但 map 输出相对较大 (150 MB)。显示 Map 100% 后,需要很长时间才能完成溢出。请建议我如何减少这段时间。以下是一些示例日志...
13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output
13/07/10 17:45:32 INFO mapred.JobClient: map 98% reduce 0%
13/07/10 17:45:34 INFO mapred.LocalJobRunner:
13/07/10 17:45:35 INFO mapred.JobClient: map 100% reduce 0%
13/07/10 17:45:37 INFO mapred.LocalJobRunner:
13/07/10 17:45:40 INFO mapred.LocalJobRunner:
13/07/10 17:45:43 INFO mapred.LocalJobRunner:
13/07/10 17:45:46 INFO mapred.LocalJobRunner:
13/07/10 17:45:49 INFO mapred.LocalJobRunner:
13/07/10 17:45:52 INFO mapred.LocalJobRunner:
13/07/10 17:45:55 INFO mapred.LocalJobRunner:
13/07/10 17:45:58 INFO mapred.LocalJobRunner:
13/07/10 17:46:01 INFO mapred.LocalJobRunner:
13/07/10 17:46:04 INFO mapred.LocalJobRunner:
13/07/10 17:46:07 INFO mapred.LocalJobRunner:
13/07/10 17:46:10 INFO mapred.LocalJobRunner:
13/07/10 17:46:13 INFO mapred.LocalJobRunner:
13/07/10 17:46:16 INFO mapred.LocalJobRunner:
13/07/10 17:46:19 INFO mapred.LocalJobRunner:
13/07/10 17:46:22 INFO mapred.LocalJobRunner:
13/07/10 17:46:25 INFO mapred.LocalJobRunner:
13/07/10 17:46:28 INFO mapred.LocalJobRunner:
13/07/10 17:46:31 INFO mapred.LocalJobRunner:
13/07/10 17:46:34 INFO mapred.LocalJobRunner:
13/07/10 17:46:37 INFO mapred.LocalJobRunner:
13/07/10 17:46:40 INFO mapred.LocalJobRunner:
13/07/10 17:46:43 INFO mapred.LocalJobRunner:
13/07/10 17:46:46 INFO mapred.LocalJobRunner:
13/07/10 17:46:49 INFO mapred.LocalJobRunner:
13/07/10 17:46:52 INFO mapred.LocalJobRunner:
13/07/10 17:46:55 INFO mapred.LocalJobRunner:
13/07/10 17:46:58 INFO mapred.LocalJobRunner:
13/07/10 17:47:01 INFO mapred.LocalJobRunner:
13/07/10 17:47:04 INFO mapred.LocalJobRunner:
13/07/10 17:47:07 INFO mapred.LocalJobRunner:
13/07/10 17:47:10 INFO mapred.LocalJobRunner:
13/07/10 17:47:13 INFO mapred.LocalJobRunner:
13/07/10 17:47:16 INFO mapred.LocalJobRunner:
13/07/10 17:47:19 INFO mapred.LocalJobRunner:
13/07/10 17:47:22 INFO mapred.LocalJobRunner:
13/07/10 17:47:25 INFO mapred.LocalJobRunner:
13/07/10 17:47:28 INFO mapred.LocalJobRunner:
13/07/10 17:47:31 INFO mapred.LocalJobRunner:
13/07/10 17:47:34 INFO mapred.LocalJobRunner:
13/07/10 17:47:37 INFO mapred.LocalJobRunner:
13/07/10 17:47:40 INFO mapred.LocalJobRunner:
13/07/10 17:47:43 INFO mapred.LocalJobRunner:
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/07/10 17:47:45 INFO mapred.LocalJobRunner:
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
...............................
...............................
...............................
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22
13/07/10 17:47:52 INFO mapred.JobClient: File Output Format Counters
13/07/10 17:47:52 INFO mapred.JobClient: Bytes Written=13401245
13/07/10 17:47:52 INFO mapred.JobClient: FileSystemCounters
13/07/10 17:47:52 INFO mapred.JobClient: FILE_BYTES_READ=18871098
13/07/10 17:47:52 INFO mapred.JobClient: HDFS_BYTES_READ=7346566
13/07/10 17:47:52 INFO mapred.JobClient: FILE_BYTES_WRITTEN=35878426
13/07/10 17:47:52 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=18621307
13/07/10 17:47:52 INFO mapred.JobClient: File Input Format Counters
13/07/10 17:47:52 INFO mapred.JobClient: Bytes Read=2558288
13/07/10 17:47:52 INFO mapred.JobClient: Map-Reduce Framework
13/07/10 17:47:52 INFO mapred.JobClient: Reduce input groups=740000
13/07/10 17:47:52 INFO mapred.JobClient: Map output materialized bytes=13320006
13/07/10 17:47:52 INFO mapred.JobClient: Combine output records=740000
13/07/10 17:47:52 INFO mapred.JobClient: Map input records=71040
13/07/10 17:47:52 INFO mapred.JobClient: Reduce shuffle bytes=0
13/07/10 17:47:52 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient: Reduce output records=740000
13/07/10 17:47:52 INFO mapred.JobClient: Spilled Records=1480000
13/07/10 17:47:52 INFO mapred.JobClient: Map output bytes=119998400
13/07/10 17:47:52 INFO mapred.JobClient: CPU time spent (ms)=0
13/07/10 17:47:52 INFO mapred.JobClient: Total committed heap usage (bytes)=1178009600
13/07/10 17:47:52 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient: Combine input records=7499900
13/07/10 17:47:52 INFO mapred.JobClient: Map output records=7499900
13/07/10 17:47:52 INFO mapred.JobClient: SPLIT_RAW_BYTES=122
13/07/10 17:47:52 INFO mapred.JobClient: Reduce input records=740000
map task 源码:
public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{
private DoubleWritable distGexpr = new DoubleWritable();
private LongWritable m2keyOut = new LongWritable();
int trMax,tstMax;
protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
Configuration conf =context.getConfiguration();
tstMax = conf.getInt("mtst", 10);
trMax = conf.getInt("mtr", 10);
}
public void map(Text key, Text values, Context context) throws IOException, InterruptedException {
String line = values.toString();
double Tij=0.0,TRij=0.0, dist=0;
int i=0,j;
long m2key=0;
String[] SLl = new String[]{};
Configuration conf =context.getConfiguration();
m2key = Long.parseLong(key.toString());
StringTokenizer tokenizer = new StringTokenizer(line);
j=0;
while (tokenizer.hasMoreTokens()) {
String test = tokenizer.nextToken();
if(j==0){
Tij = Double.parseDouble(test);
}
else if(j==1){
TRij = Double.parseDouble(test);
}
else if(j==2){
SLl = StringUtils.split(conf.get(test),",");
}
j++;
}
//Map input ends
//Distance Measure function
dist = (long)Math.pow( (Tij - TRij), 2);
//remove gid from key
m2key = m2key / 100000;
//Map2 <key,value> emit starts
for(i=0; i<SLl.length;i++){
long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key;
m2keyOut.set(m2keyNew);
distGexpr.set(dist);
context.write(m2keyOut,distGexpr);
}
//<key,value> emit done
}
}
示例 map 输入:每行中的最后一个变量从广播变量中获取一个整数数组。每行将产生大约 100-200 条输出记录。
10100014 1356.3238 1181.63 gs-4-56
10100026 3263.1167 3192.4131 gs-3-21
10100043 1852.0 1926.3962 gs-4-76
10100062 1175.5925 983.47125 gs-3-19
10100066 606.59125 976.26625 gs-8-23
示例 map 输出:
10101 8633.0
10102 1822.0
10103 13832.0
10104 2726470.0
10105 1172991.0
10107 239367.0
10109 5410384.0
10111 7698352.0
10112 6.417
最佳答案
我想您已经解决了这个问题(发布原始消息 2 年后),但对于遇到同样问题的任何人,我会尝试提供一些建议。
从您的计数器来看,我知道您已经使用了压缩(因为映射输出物化字节数与映射输出字节数不同),这是一件好事。您可以使用可变长度编码的 VLongWritable 进一步压缩映射器的输出。类,作为映射输出键类型。 (以前也有一个 VDoubleWritable 类,如果我没记错的话,但现在肯定已经弃用了)。
在发出输出的 for 循环中,无需每次都设置 distGexpr
变量。它总是相同的,所以在 for 循环之前设置它。您还可以在循环外存储一个长整数和 trMax*tstMax
的乘积,而不是在每次迭代时计算它。
如果可能,将您的输入键设为 LongWritable(来自上一个作业),以便您可以保存 Long.parseLong()
和 Text.toString()
调用。
如果可能(取决于你的 reducer ),使用组合器来减少溢出字节的大小。
我找不到一种方法来跳过 for 循环中的 Integer.parseInt()
调用,但是如果您最初可以将 SLl
加载为int[]
.
关于hadoop - "Starting flush of map output"在 hadoop 映射任务中花费很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17567257/