我正在从事一个大型 hadoop 项目,并且有一个小型 KPI,我必须在减少输出中仅写入前 10 个值。 为了完成这个要求,我使用了一个计数器并在计数器等于 11 时中断循环,但 reducer 仍然将所有值写入 HDFS。
这是一个非常简单的 java 代码,但我卡住了 :(
为了测试,我创建了一个独立的类(java 应用程序)来执行此操作,并且它在那里工作;我想知道为什么它在 reducer 代码中不起作用。
如果我遗漏了什么,请有人帮助我并提出建议。
map - 减少代码
package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ValueSortExp2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Test commond");
job.setJarByClass(ValueSortExp2.class);
// Setup MapReduce
job.setMapperClass(MapTask2.class);
job.setReducerClass(ReduceTask2.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator2.class);
// Input
FileInputFormat.addInputPath(job, new Path(arguments[0]));
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
job.setOutputFormatClass(TextOutputFormat.class);
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator2 extends WritableComparator {
public IntComparator2() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {
String tokens[]= value.toString().split("\\t");
// int empId = Integer.parseInt(tokens[0]) ;
int count = Integer.parseInt(tokens[2]) ;
context.write(new IntWritable(count), new Text(value));
}
}
public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
int cnt=0;
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list ) {
cnt ++;
if (cnt==11)
{
break;
}
context.write(new IntWritable(cnt), value);
}
}
}
}
简单的 JAVA 代码运行良好
package comparableTest;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class TestData {
//static int cnt=0;
public static void main(String args[]) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<String>() {{
add("A");
add("B");
add("C");
add("D");
}};
reduce(list);
}
public static void reduce(Iterable<String> list)
throws java.io.IOException, InterruptedException {
int cnt=0;
for (String value : list ) {
cnt ++;
if (cnt==3)
{
break;
}
System.out.println(value);
}
}
}
示例数据 -- 标题只是更多信息,实际数据来自第 2 行
`ID NAME COUNT(需要显示top 10 desc)
1 玩具总动员 (1995) 2077
10 黄金眼 (1995) 888
100 市政厅 (1996) 128
1000 凝结 (1996) 20
1001 Associate, The (L'Associe)(1982) 0
1002 埃德的下一步行动(1996 年)8
1003 极端措施 (1996) 121
1004 闪光人 (1996) 101
1005 D3:威猛鸭子 (1996) 142
1006 室,(1996) 78
1007 苹果饺子帮 (1975) 232
1008 戴维·克罗克特,荒野之王 (1955) 97
1009逃往魔女山(1975)291
101 瓶火箭 (1996) 253
1010 爱虫 (1969) 242
1011 赫比再次骑行 (1974) 135
1012 老耶勒 (1957) 301
1013 parent 陷阱,(1961)258
1014 盲目乐观 (1960) 136
1015 归途:不可思议的旅程 (1993) 234
1016 毛茸茸的狗 (1959) 156
1017 瑞士家庭罗宾逊 (1960) 276
1018 那只该死的猫! (1965) 123
1019 海底 20,000 里格(1954 年)575
102 错先生(1996)60
1020 酷跑 (1993) 392
1021 外场天使 (1994) 247
1022 灰姑娘 (1950) 577
1023 小熊维尼和大风天 (1968) 221
1024 三个骑士,(1945 年)126
1025 石中剑 (1963) 293
1026 心心相印 (1949) 8
1027 罗宾汉:盗贼王子 (1991) 344
1028 欢乐满人间 (1964) 1011
1029 小飞象 (1941) 568
103 难忘 (1996) 33
1030 皮特的龙 (1977) 323
1031 床 Handlebars 和扫帚 (1971) 319`
最佳答案
如果您将 int cnt=0;
移动到 reduce 方法中(作为此方法的第一条语句),您将获得每个键的前 10 个值(我想这就是您想要的).
否则,就像现在一样,您的计数器将继续增加,您将仅跳过第 11 个值(无论键如何),继续第 12 个。
如果你只想打印 10 个值(不管键是什么),你将 cnt
初始化保留在原处,并将你的 if
条件更改为 if (cnt > 10)
...
但是,这不是一个好的做法,因此您可能需要重新考虑您的算法。 (假设你不想要 10 个随机值,当你有超过 1 个 reducer 和哈希分区器时,你怎么知道在分布式环境中首先处理哪个键?)
关于java - 计数器在 reducer 代码中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46087100/