java - 计数器在 reducer 代码中不起作用

标签 java hadoop mapreduce hadoop2

我正在从事一个大型 hadoop 项目,并且有一个小型 KPI,我必须在减少输出中仅写入前 10 个值。 为了完成这个要求,我使用了一个计数器并在计数器等于 11 时中断循环,但 reducer 仍然将所有值写入 HDFS。

这是一个非常简单的 java 代码,但我卡住了 :(

为了测试,我创建了一个独立的类(java 应用程序)来执行此操作,并且它在那里工作;我想知道为什么它在 reducer 代码中不起作用。

如果我遗漏了什么,请有人帮助我并提出建议。

map - 减少代码

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\\t");

            //    int empId = Integer.parseInt(tokens[0])    ;    
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }    

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;    
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}  

简单的 JAVA 代码运行良好

package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;    
            }

            System.out.println(value);    


        }

    }
}

示例数据 -- 标题只是更多信息,实际数据来自第 2 行

`ID NAME COUNT(需要显示top 10 desc)

1 玩具总动员 (1995) 2077

10 黄金眼 (1995) 888

100 市政厅 (1996) 128

1000 凝结 (1996) 20

1001 Associate, The (L'Associe)(1982) 0

1002 埃德的下一步行动(1996 年)8

1003 极端措施 (1996) 121

1004 闪光人 (1996) 101

1005 D3:威猛鸭子 (1996) 142

1006 室,(1996) 78

1007 苹果饺子帮 (1975) 232

1008 戴维·克罗克特,荒野之王 (1955) 97

1009逃往魔女山(1975)291

101 瓶火箭 (1996) 253

1010 爱虫 (1969) 242

1011 赫比再次骑行 (1974) 135

1012 老耶勒 (1957) 301

1013 parent 陷阱,(1961)258

1014 盲目乐观 (1960) 136

1015 归途:不可思议的旅程 (1993) 234

1016 毛茸茸的狗 (1959) 156

1017 瑞士家庭罗宾逊 (1960) 276

1018 那只该死的猫! (1965) 123

1019 海底 20,000 里格(1954 年)575

102 错先生(1996)60

1020 酷跑 (1993) 392

1021 外场天使 (1994) 247

1022 灰姑娘 (1950) 577

1023 小熊维尼和大风天 (1968) 221

1024 三个骑士,(1945 年)126

1025 石中剑 (1963) 293

1026 心心相印 (1949) 8

1027 罗宾汉:盗贼王子 (1991) 344

1028 欢乐满人间 (1964) 1011

1029 小飞象 (1941) 568

103 难忘 (1996) 33

1030 皮特的龙 (1977) 323

1031 床 Handlebars 和扫帚 (1971) 319`

最佳答案

如果您将 int cnt=0; 移动到 reduce 方法中(作为此方法的第一条语句),您将获得每个键的前 10 个值(我想这就是您想要的).

否则,就像现在一样,您的计数器将继续增加,您将仅跳过第 11 个值(无论键如何),继续第 12 个。

如果你只想打印 10 个值(不管键是什么),你将 cnt 初始化保留在原处,并将你的 if 条件更改为 if (cnt > 10)... 但是,这不是一个好的做法,因此您可能需要重新考虑您的算法。 (假设你不想要 10 个随机值,当你有超过 1 个 reducer 和哈希分区器时,你怎么知道在分布式环境中首先处理哪个键?)

关于java - 计数器在 reducer 代码中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46087100/

相关文章:

r - 当我尝试在 dplyr group_map 函数中使用 defuse-inject 模式时失败

java - Hadoop 最后一个 map 作业卡住了 - 需要帮助

java - 如何在 OpenStack Swift 中使用 'X-Bulk-Delete: true_value' header ?

java - 幻方最终值始终为真

java - OnActivityResult()

java - 在java中将unix时间戳转换为日期

hadoop - 如何按多列分组,然后在 Hive 中转置

java - 如何终止(或杀死)Java 中的 Hadoop 作业?

hadoop - 启动 MapReduce 作业的不同方式

Hadoop post-hook 和作业完成通知