hadoop reducer 不考虑两个相等的自定义可写对象相等

标签 hadoop mapreduce writable

我正在尝试编写一个 map reduce 程序来检查共同的 friend 。
我使用自定义可写(FriendPair)作为键。
给定以下输入

Tom Jerry,John
John Jerry,Sarah,Tom
它应该输出 Jerry 作为 Tom 和 John 的共同 friend
[John,Tom]    Jerry
[John,Sarah]    
[John,Jerry]
[Tom,Jerry] 
相反,map reduce 输出以下内容
[John,Tom]  
[John,Sarah]    
[John,Jerry]    
[Tom,John]  
[Tom,Jerry]
键 [John,Tom] 和 [Tom,John] 被认为是不相等的。
下面是代码
自定义可写
    public class FriendPair implements WritableComparable<FriendPair> {
        
        Text friend1;
        Text friend2;
        
        public FriendPair() {
            this.friend1 = new Text("");
            this.friend2 = new Text("");
        }
        
        public FriendPair(Text friend1, Text friend2) {
            this.friend1 = friend1;
            this.friend2 = friend2;
        }
        
        public Text getFriend1() {
            return friend1;
        }
        public void setFriend1(Text friend1) {
            this.friend1 = friend1;
        }
        public Text getFriend2() {
            return friend2;
        }
        public void setFriend2(Text friend2) {
            this.friend2 = friend2;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            friend1.write(out);
            friend2.write(out);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            friend1.readFields(in);
            friend2.readFields(in);
        }
    
        @Override
        public int compareTo(FriendPair pair2) {
            return ((friend1.compareTo(pair2.getFriend2()) == 0 && friend2.compareTo(pair2.getFriend1()) == 0)
                   || (friend1.compareTo(pair2.getFriend1()) == 0 && friend2.compareTo(pair2.getFriend2()) == 0)) ? 0 : -1;
        }
    
        @Override
        public boolean equals(Object o) {
            FriendPair pair2 = (FriendPair) o;
            return (friend1.equals(pair2.getFriend2()) && friend2.equals(pair2.getFriend1()) 
                    || friend1.equals(pair2.getFriend1()) && friend2.equals(pair2.getFriend2()));
        }
        
        @Override
        public String toString() {
            return "[" + friend1 + "," + friend2 + "]";
        }
        
        @Override
        public int hashCode() {
            return friend1.hashCode() + friend2.hashCode();
        }
    
    }
映射器
public class MutualFriendsMapper extends Mapper<LongWritable, Text, FriendPair, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] items = line.split("\t");

        String name = items[0];
        String friendsList = items[1];
        String[] friends = friendsList.split(",");
        for (String friend : friends) {
            FriendPair fp = new FriendPair(new Text(name), new Text(friend));
            FriendPair fp2 = new FriendPair(new Text(friend), new Text(name));
            context.write(fp, new Text(friendsList));
        }
    }
}
reducer
public class MutualFriendsReducer extends Reducer<FriendPair, Text, FriendPair, FriendArray> {

    @Override
    public void reduce(FriendPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        
        List<String> allFriends = new ArrayList<String>();
        for(Text value : values) {
            String[] valueArray = value.toString().split(",");
            allFriends.addAll(Arrays.asList(valueArray));
        }
        List<Text> commonFriends = new ArrayList<Text>();
        Set<String> uniqueFriendSet = new HashSet<String>(allFriends);
        for(String friend : uniqueFriendSet) {
            int frequency = Collections.frequency(allFriends, friend);
            if(frequency > 1) {
                commonFriends.add(new Text(friend));
            }
        }
        
        context.write(key, new FriendArray(Text.class, commonFriends.toArray(new Text[commonFriends.size()])));
    }
}
FriendArray(输出)
public class FriendArray extends ArrayWritable {

    public FriendArray(Class<? extends Writable> valueClass, Writable[] values) {
        super(valueClass, values);
    }
    
    public FriendArray(Class<? extends Writable> valueClass) {
        super(valueClass);
    }
    
    public FriendArray() {
        super(Text.class);
    }

    @Override
    public Text[] get() {
        return (Text[]) super.get();
    }
    
    @Override
    public void write(DataOutput data) throws IOException {
        for(Text t : get()) {
            t.write(data);
        }
    }
    
    @Override
    public String toString() {
        Text[] friendArray = Arrays.copyOf(get(), get().length, Text[].class);
        String print="";
        
        for(Text f : friendArray) 
            print+=f+",";
        
        return print;
    }
}
任何帮助将不胜感激。

最佳答案

在“排序”阶段,Hadoop 不对 java 对象进行操作,而仅对它们的字节表示(FriendPair.write() 方法的输出)进行操作,因此它不能调用 FriendPair.equals() .因此,为了让 Hadoop 理解键 [John,Tom] 和 [Tom,John] 相等,您必须确保它们的 write输出是相同的。实现此目的的一种方法是强制执行配对中 friend 的顺序,例如按字母顺序对它们进行排序(然后两个配对看起来都是 [John,Tom])。

关于hadoop reducer 不考虑两个相等的自定义可写对象相等,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64165809/

相关文章:

docker - spark-submit如何在群集模式下传递--driver-class-path?

hadoop - 如何设置OutputCommitter配置?

hadoop - map() 和 reduce() 应该返回相同类型的键/值对吗?

hadoop - yarn 不尊重 yarn.nodemanager.resource.cpu-vcores

java - 可以直接使用DataInput和DataOutput,为什么还要使用Writable?

java - 你如何处理 reducer 中的不同值类型

java - 在 hadoop.io api 的可写类中使用 readFields()

hadoop - Apache Pig 中的连接错误

azure - HDInsight字数映射减少卡住映射器的程序的程序100%和缩减器0%

java - JSON映射器类中的错误