hadoop - 映射器与DataStax Cassandra 1.2.1一起无限发展

标签 hadoop cassandra

我的cassandra scehma列系列中只有一行。运行mapreduce时,映射器会不断读取同一行。因此,映射器进入无穷大且 reducer 卡住了。

这些是使用的配置

conf.set("fs.default.name", "hdfs://28.151.181.107:9000");
    conf.set("mapred.job.tracker", "28.151.181.107:9001");
    conf.setJar("C:\\hadoop-test\\demo\\target\\demo-0.0.1-SNAPSHOT.jar");

    conf.setMapperClass(TokenizerMapper.class);
    conf.setCombinerClass(ReducerToFilesystem.class);
    conf.setReducerClass(ReducerToFilesystem.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);

    FileOutputFormat.setOutputPath(conf, new Path(resultFileName));

    conf.setInputFormat(ColumnFamilyInputFormat.class);

    ConfigHelper.setInputRpcPort(conf, PORT + "");
    ConfigHelper.setInputInitialAddress(conf, HOST);
    ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
    ConfigHelper.setInputColumnFamily(conf, KEY_SPACE, COLUMN_FAMILY,true);
    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setStart(new byte[0]);
    sliceRange.setFinish(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(conf, predicate);
    ConfigHelper.setOutputInitialAddress(conf, HOST);
    ConfigHelper.setOutputPartitioner(conf, "RandomPartitioner");

和Mapper&Reducer是
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
            OutputCollector<Text, Text> paramOutputCollector,
            Reporter paramReporter) throws IOException {

        DateSerializer sz = new DateSerializer();
        StringSerializer s = new StringSerializer();

        for (IColumn col : columns.values()) {
            Date name = sz.fromByteBuffer(col.name());

            double value = ByteBufferUtil.toDouble(col.value());
            paramOutputCollector.collect(new Text(s.fromByteBuffer(key)),
                    new Text(name.toGMTString() + " [] []  " + value));
        }

    }


public static class ReducerToFilesystem implements
        Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> paramOutputCollector,
            Reporter paramReporter) throws IOException {
        StringBuffer bfr = new StringBuffer();
        while (values.hasNext()) {
            Text val = values.next();
            bfr.append(val);
            bfr.append("<--->");

        }

        paramOutputCollector.collect(key, new Text(bfr.toString()));

    }

请指导。

谢谢您的帮助!

最佳答案

我已经调试了一下,我认为您是对的。即使在1.2.9版本中,分页也无法正确完成。

关于hadoop - 映射器与DataStax Cassandra 1.2.1一起无限发展,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16572235/

相关文章:

hadoop - Knox Gateway 数据库连接器

java - 如何在 Mahout Spark 上编写推荐

hadoop - 如何使用oozie检查文件是否存在于HDFS位置?

web-services - Hadoop前面的服务

cassandra - 为什么我不能在过滤主键后在 Cassandra 中添加 WHERE 子句?

sql - 带有 SSTable 附加二级索引的 Cassandra 与关系数据库

cassandra - Memtable 类型和大小分配

cassandra - Kubernetes:多个DC中的Petset Cassandra

hadoop - 如何拆分输入集以获得更好的 Hadoop 平衡?

c++ - 我如何知道 C++ 驱动程序是否丢失了与 Cassandra 的连接?