java - Hadoop MapReduce : Strange Result when Storing Previous Value in Memory in a Reduce Class (Java)

标签 java hadoop mapreduce object-reference reducers

如果我希望存储迭代器的当前值以与 Reduce 方法中迭代器的下一个值进行比较,Hadoop 要求我克隆它而不是简单地将其引用分配给临时变量。

我要将代码发布到我的 reducer 。

你会看到两部分:

  1. Eclipse中测试的主要方法
  2. 在 Hadoop 中执行的 reduce 方法

你会注意到这两行代码是相同的,除了以下几点:

  1. main 方法从我硬编码到其中的 ArrayList 获取 Iterator,而 reduce 方法从 mapper 方法获取 Iterator。
  2. main 方法当然不会执行 context.write。

这是两者几乎共享的代码:

MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;

while (values.hasNext()) {
    currentMMI = values.next();
    if (currentAggregation == null) {
        currentAggregation = new UltraAggregation(currentMMI);
    }
    if (previousMMI == null) {
        //previousMMI = new MMI(currentMMI);
        previousMMI = currentMMI;
        continue;
    }
    System.out.println();
    System.out.println("currentMMI = " + currentMMI);
    System.out.println("previousMMI = " + previousMMI);
    System.out.println("equals? " + currentMMI.equals(previousMMI));
    System.out.println("==? " + (currentMMI == previousMMI));
    System.out.println();

    // Business logic goes here and involves a context.write on certain conditions

    previousMMI = currentMMI;
}
//final context.write

您会注意到,在每个循环结束时,我将刚刚使用的 MMI(“currentMMI”)的引用设置为对象变量“previousMMI”。然后,在下一个循环中,我将 next() 的引用设置为 currentMMI。当我在 Eclipse 中执行我的主要方法时,如预期的那样,以下查询评估为 false:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);  

但是,当在 Hadoop 中执行时,对于以下两个查询,currentMMI 和 previousMMI 始终评估为 true:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);

只有当我将 previousMMI = currentMMI 行更改为 previousMMI = new MMI(currentMMI) 时,它们的计算结果才会为 false。(我做了MMI 类的构造函数,本质上是对传入参数的浅克隆)。

为什么在 hadoop 中使用 reducer 而不是在 main 方法中使用时必须克隆而不是设置引用?

我现在要复制粘贴 reducer 类,它有两部分:eclipse 测试的主要方法和 Hadoop 中实际使用的 reduce 方法。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.cisco.webex.hadoop.ultrautility.models.MMI;
import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;

public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {
    public static void main(String[] args) {
        ArrayList<MMI> mmis = new ArrayList<MMI>();
        mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));
        mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));
        mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));
        mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));
        mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));
        mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));
        mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));
        mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));
        mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));
        mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));

        Iterator<MMI> values = mmis.iterator();

        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI);
            /*
            * THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD
            */
            previousMMI = currentMMI;
        }
        //context.write(null, currentAggregation);
    }

    @Override
    public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {
        Iterator<MMI> values = vals.iterator();

        //key = deviceId
        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI); //Acts as intended
            /*
            * THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP
            */
            previousMMI = currentMMI; // Causes errors
        }
        context.write(null, currentAggregation);
    }
}

这是我在 eclipse 中使用静态值执行 ma​​in 方法 时从 stdout 截断的结果:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 06:00:00 PDT 2013;Uptime|0.0
equals? false
==? false


currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 08:00:00 PDT 2013;Uptime|2.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
equals? false
==? false

这是我执行 hadoop jar 时截断的结果:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
equals? true
==? true

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
equals? true
==? true

为什么我必须为 Hadoop 而不是在 Eclipse 中克隆它?

最佳答案

将所有值存储在内存中是非常低效的,因此对象被重用并一次加载一个。参见 this other SO question一个很好的解释。总结:

[...] when looping through the Iterable value list, each Object instance is re-used, so it only keeps one instance around at a given time.

关于java - Hadoop MapReduce : Strange Result when Storing Previous Value in Memory in a Reduce Class (Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20863781/

相关文章:

java - Spring Integration Java DSL - 异步执行多个服务激活器?

java - Play Framework 2 Java - Fire And Forget

hadoop - yarn 在 1 小时后自动杀死所有作业,没有错误

javascript - MapReduce 函数 MongoDB NodeJs

java - 为什么这段 Java 代码有效?编译器不会提示关闭

java - 使用 Java 拆分 XML 文件

hadoop - Hadoop 数据节点/名称节点实际可以处理多少 block /对象?

Java 客户端拒绝连接到远程服务器

java - 如何在 Apache Spark 中重置 MapReduce 函数上的迭代器

amazon-web-services - 使用亚马逊弹性mapreduce服务时如何在hadoop中包含第三方库