java - 如何使用 TableMapReduceUtil 在 hbase 扫描器结果上运行 mapreduce

标签 java hadoop mapreduce hbase hdfs


我的 hbase 表是这样的:

    key---------value
    id1/bla     value1
    id1/blabla  value2
    id2/bla     value3
    id2/blabla  value4
    ....

有数百万个以 id1 开头的键和数百万个以 id2 开头的键。

我想用 mapReduce 从 hbase 读取数据,因为有很多键以相同的 ID 和每个 ID 一张 map 是不够的。我更喜欢每个 Id 100 个映射器

我希望超过 1 个映射器将在已按 id 过滤的同一个 scannerResult 上运行。 我阅读了 TableMapReduceUtil 并尝试了以下操作:

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
    sourceTable,        // input table
    scan,               // Scan instance to control CF and attribute selection
    MyMapper.class,     // mapper class
    Text.class,         // mapper output key
    IntWritable.class,  // mapper output value
    job);


使用 map 函数看起来像这样(它应该迭代扫描结果):

public static class MyMapper extends TableMapper<Text, IntWritable>  {

    private final IntWritable ONE = new IntWritable(1);
    private Text text = new Text();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
            text.set("123");     // we can only emit Writables...    
            context.write(text, ONE);
    }
}
<br>



我的问题是:

  1. map 函数怎么可能得到输入结果而不是 ResultScanner?我知道扫描的结果可以由 ResultScanner 迭代,而 ResultScanner 可以由 Result 迭代。 ResultScanner 有结果列表\数组,不是吗?
  2. 如何在 map 函数中迭代扫描仪的结果?
  3. 我如何控制此函数的拆分数量。如果它只打开 10 个映射器而我想要 20 个,是否可以更改某些内容?
  4. 有没有最简单的方法可以实现我的目标?

最佳答案

我将从您列表中的第 4 位开始:

默认行为是为每个区域创建一个映射器。因此,与其尝试修改 TableInputFormat 以根据您的规范创建自定义输入拆分,您应该首先考虑将数据拆分为 100 个区域(然后您将拥有 100 个相当平衡的映射器)。

这种方法提高了您的读写性能,因为您不太容易受到热点的影响(假设您的集群中有一个或两个以上的区域服务器)。

解决此问题的首选方法是预拆分表(即在创建表时定义拆分)。

关于java - 如何使用 TableMapReduceUtil 在 hbase 扫描器结果上运行 mapreduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39066815/

相关文章:

Hadoop 辅助 NameNode IP 地址

Hadoop:hdfs dfs -text 的倒数

java - MockHttpServletRequest mc = new MockHttpServletRequest() = >未知类型 "org.springframework.mock.web.MockHttpServletRequest"

java - 日期/日历的 Hadoop 可写

java - 无法实例化类型 [简单类型,类 java.time.LocalDate

Hadoop->Mapper->我们如何从给定的输入路径中只读取每个文件的前 N ​​行?

java - 如何在不解压缩所有依赖项的情况下使用 Maven 组装控制台应用程序?

java - 在不使用context.write的情况下使用MultipleOutputs将导致空文件

hadoop - Hbase - 通过列名前缀获取行的列名

mongodb - 是否有可能在 mongo 中获取 map 减少进度通知?