java - Mongo hadoop mapreduce 显示错误

标签 java mongodb hadoop mapreduce nosql

我对大数据和 NOSQL 领域很陌生,我正在尝试一个示例程序

我正在尝试从我的 mongo 数据库中获取详细信息。以下是我的数据库架构--

  { "_id" : ObjectId("51d11c95e82449edcf7640bc"), "Called_Number" : NumberLong("7259400112"), "Calling_Number" : NumberLong("9008496311"), "Date" : "22-Apr-13", "Time" : "10:21:43", "Duration" : "4:36" }

现在我尝试从数据库中获取值并运行映射缩减作业,以便我可以找到如下所示的详细信息

{ "调用号码":7259400112 , 被叫号码: "9008496311"频率: "3"}

以下是我正在尝试的

package callcircle;

import java.io.*;
import java.util.*;

import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.bson.*;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.*;

public class call {

    private static final Log log = LogFactory.getLog(call.class);

    public static class TokenizerMapper extends
            Mapper<Object, Object, Text, IntWritable> {


        private final static IntWritable one = new IntWritable(1);
        private final Text word = new Text();


        public void map(Object calling_number, Object called_number,
                Context context) throws IOException, InterruptedException {
            System.out.println("entering method");


        //  calling_number = (Object) calling_number).get("Calling_Number");
            called_number = ((BSONWritable) called_number).get("Called_Number");

            String CallNumer01 = called_number.toString();

            String[] recips = CallNumer01.split(",");



            for (int i = 0; i < recips.length; i++) {
                String recip = recips[i].trim();
                if (recip.length() > 0) {


                    // context.write(new CallPair(calling_number, recip), new IntWritable(1));
                    // word.set(CallNumer01); context.write( word, one );

                    //System.out.println("After mapping");

                }
            }
        }
    }

    public class CallReducer extends
        Reducer<CallPair, IntWritable, BSONWritable, IntWritable> {

        public void reduce(final CallPair pKey,
                final Iterable<IntWritable> pValues, final Context pContext)
                throws IOException, InterruptedException {
            int sum = 0;
            for (final IntWritable value : pValues) {
                sum += value.get();
            }
            @SuppressWarnings("static-access")
            BSONObject outDoc = new BasicDBObjectBuilder().start()
                    .add("f", pKey.calling_number).add("t", pKey.called_number)
                    .get();
            BSONWritable pkeyOut = new BSONWritable(outDoc);
            pContext.write(pkeyOut, new IntWritable(sum));
        }

    }



    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        final Configuration conf = new Configuration();
        System.out.println("Conf1: " + conf);
        MongoConfigUtil.setInputURI(conf, "mongodb://localhost/CDR.in1");
        MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/CDR.out");
        System.out.println("Conf: " + conf);

        final Job job = new Job(conf, "CDR");

        job.setJarByClass(call.class);
        System.out.println("Conf2: " + conf);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(CallReducer.class);
        job.setReducerClass(CallReducer.class);
        System.out.println("Conf3: " + conf);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.out.println("Conf3: " + conf);
        job.setInputFormatClass(MongoInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        System.out.println("Conf4: " + conf);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        System.out.println("Conf6: " + conf);
    }

}

但是我收到以下错误

In Main
Conf1: Configuration: core-default.xml, core-site.xml
Conf: Configuration: core-default.xml, core-site.xml
13/07/01 19:04:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
Conf2: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf4: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
13/07/01 19:04:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/01 19:04:27 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
13/07/01 19:04:28 INFO mapred.JobClient: Running job: job_local_0001
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
should setup context
13/07/01 19:04:28 INFO input.MongoInputSplit: Deserialized MongoInputSplit ... { length = 9223372036854775807, locations = [localhost], keyField = _id, query = { "$query" : { }}, fields = { }, sort = { }, limit = 0, skip = 0, noTimeout = false}
13/07/01 19:04:28 INFO mapred.MapTask: io.sort.mb = 100
13/07/01 19:04:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/07/01 19:04:28 INFO mapred.MapTask: record buffer = 262144/327680
entering method
13/07/01 19:04:28 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.hadoop.io.BSONWritable
    at callcircle.call$TokenizerMapper.map(call.java:36)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/07/01 19:04:29 INFO mapred.JobClient:  map 0% reduce 0%
13/07/01 19:04:29 INFO mapred.JobClient: Job complete: job_local_0001
13/07/01 19:04:29 INFO mapred.JobClient: Counters: 0

请有人指导我哪里错了?

谢谢

最佳答案

如果映射器和化简器不使用相同的输出类型,则必须显式指定映射器键/值类型 - 因此您可能还需要添加:

setMapOutputKeyClass(Text.class)
setMapOutputValueClass(IntWritable.class)

关于java - Mongo hadoop mapreduce 显示错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17406063/

相关文章:

java - 记录事件的 SLF4J 版本

git - git 存储库上的 MongoDB 数据库

python - 我是否需要安装 Hadoop 才能使用 Pyspark 的所有功能?

hadoop - 当文件添加到指向目录时,外部配置单元表是否会自行刷新

hadoop - 如何在 Apache mahout 中合并两个相似实例

java - JUnit 5 在 Debug模式下截断堆栈跟踪并使 JVM 崩溃

java - 将未知对象转换为 boolean 值

java - Android开发: Getting the name of Images

node.js - Mongoose 选择 : false not working on location nested object

node.js - MongoDB 位置附近独特