java - 计划运行一次的映射器意外地多次执行

标签 java hadoop mapreduce mapper

我试图编写一个非常简单的作业,仅使用1个映射器,而没有reducer将一些数据写入hbase。在映射器中,我试图简单地使用hbase打开连接,将几行数据写入表中,然后关闭连接。在作业驱动程序中,我正在使用JobConf.setNumMapTasks(1);和JobConf.setNumReduceTasks(0);指定将仅执行1个映射器,而不执行reducer。我还在jobConf中将reducer类设置为IdentityReducer。我观察到的奇怪行为是该作业已成功将数据写入hbase表,但是之后,我在日志中看到它不断尝试打开与hbase的连接,然后关闭连接,该连接持续20-30分钟,在该作业之后宣布已成功完成100%。最后,当我检查由放置在OutputCollector.collect(...)中的虚拟数据创建的_success文件时,我看到数百行的虚拟数据应该只有1行。
以下是作业驱动程序的代码

    public int run(String[] arg0) throws Exception {
        Configuration config = HBaseConfiguration.create(getConf());
        ensureRequiredParametersExist(config);
        ensureOptionalParametersExist(config);

        JobConf jobConf = new JobConf(config, getClass());
        jobConf.setJobName(config.get(ETLJobConstants.ETL_JOB_NAME));
        //set map specific configuration
        jobConf.setNumMapTasks(1);
        jobConf.setMaxMapAttempts(1);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setMapperClass(SingletonMapper.class);
        jobConf.setMapOutputKeyClass(LongWritable.class);
        jobConf.setMapOutputValueClass(Text.class);

        //set reducer specific configuration
        jobConf.setReducerClass(IdentityReducer.class);
        jobConf.setOutputKeyClass(LongWritable.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        jobConf.setNumReduceTasks(0);

        //set job specific configuration details like input file name etc
        FileInputFormat.setInputPaths(jobConf, jobConf.get(ETLJobConstants.ETL_JOB_FILE_INPUT_PATH));
        System.out.println("setting output path to : " + jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH));
        FileOutputFormat.setOutputPath(jobConf,
                new Path(jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH)));
        JobClient.runJob(jobConf);
        return 0;
    }

驱动程序类扩展了配置并实现了工具(我使用了权威指南中的示例)以下是我的映射器类中的代码。

以下是我在Mapper的map方法中的代码,其中我只是打开与Hbase的连接,进行一些初步检查以确保表存在,然后写入行并关闭表。
    public void map(LongWritable arg0, Text arg1,
        OutputCollector<LongWritable, Text> arg2, Reporter arg3)
        throws IOException {


    HTable aTable = null;
    HBaseAdmin admin = null;


    try {

        arg3.setStatus("started");

        /*
         * set-up hbase config
         */
        admin = new HBaseAdmin(conf);

        /*
         * open connection to table
         */
        String tableName = conf.get(ETLJobConstants.ETL_JOB_TABLE_NAME);

        HTableDescriptor htd = new HTableDescriptor(toBytes(tableName));
        String colFamilyName = conf.get(ETLJobConstants.ETL_JOB_TABLE_COLUMN_FAMILY_NAME);

        byte[] tablename = htd.getName();
        /* call function to ensure table with 'tablename' exists */

        /*
         * loop and put the file data into the table
         */
        aTable = new HTable(conf, tableName);

        DataRow row = /* logic to generate data */
        while (row != null) {
            byte[] rowKey = toBytes(row.getRowKey());
            Put put = new Put(rowKey);
            for (DataNode node : row.getRowData()) {
                put.add(toBytes(colFamilyName), toBytes(node.getNodeName()),
                        toBytes(node.getNodeValue()));
            }
            aTable.put(put);
            arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo added another data row to hbase");
            row = fileParser.getNextRow();
        }
        aTable.flushCommits();
        arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo Finished adding data to hbase");

    } finally {
        if (aTable != null) {
            aTable.close();
        }

        if (admin != null) {
            admin.close();
        }
    }

    arg2.collect(new LongWritable(10), new Text("something"));
    arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxoadded some dummy data to the collector");
}

如您所见,到最后,我正在将一些虚拟数据写到集合中(10,“某物”),并且在作业终止后,我在_success文件中看到了数百行该数据。
我无法确定为什么映射器代码会一遍又一遍地重启,而不是一次运行。任何帮助将不胜感激。

最佳答案

JobConf.setNumMapTasks(1)(实际上定义您指定的编号)不同,使用setNumReduceTasks只是想告诉hadoop您希望使用1个映射器(如果可能)。

这就是为什么要运行更多映射器并观察所有这些数字的原因。

有关更多详细信息,请阅读this post

关于java - 计划运行一次的映射器意外地多次执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31174997/

相关文章:

java - JFrame/JPanel 的尺寸不正确

java - getCurrentSession 在每次调用时创建新连接

hadoop - 如何将 Double[] 插入到 HBase 中?

hadoop - 如何在HA模式下配置HBase?

f# - 将函数列表缩减为 bool 值

java - 解决数独程序

java - Android:防止剪贴板复制文本

hadoop - 无法实例化org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

hadoop - 是否所有具有相同行键的数据都存储在同一个节点中?

java - hadoop mapreduce IntWritable范围有多长?