java - 在 HBase 单元中保存多个版本

标签 java hadoop mapreduce hbase apache-zookeeper

我是 HBase 新手。我正在尝试在 HBase 的单元格中保存多个版本,但我只获取最后保存的值。我尝试了以下两个命令来检索多个保存的版本: 获取 'Dummy1','abc', {COLUMN=>'backward:first', VERSIONS=>12}扫描 'Dummy1', {VERSIONS=>12} 两者都返回输出如下:

ROW                   COLUMN+CELL                                               
 abc                  column=backward:first, timestamp=1422722312845, value=rrb 

0.0150 秒内 1 行 输入文件如下:

abc xyz kkk
abc qwe asd
abc anf rrb

HBase中建表的代码如下:

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class HBaseTableCreator {

  public static void main(String[] args) throws Exception {

      HBaseConfiguration conf = new HBaseConfiguration();
      conf.set("hbase.master","localhost:60000");

      HBaseAdmin hbase = new HBaseAdmin(conf);
      HTableDescriptor desc = new HTableDescriptor("Dummy");
      HColumnDescriptor meta = new HColumnDescriptor("backward".getBytes());
      meta.setMaxVersions(Integer.MAX_VALUE);
      HColumnDescriptor prefix = new HColumnDescriptor("forward".getBytes());
      prefix.setMaxVersions(Integer.MAX_VALUE);
      desc.addFamily(meta);
      desc.addFamily(prefix);
      hbase.createTable(desc);

 }

}

转储HBase中数据的代码如下: 主要类别: 导入java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class TestMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
    {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();
        //HTable hTable = new HTable(conf, args[3]);  
        String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length!=2)
        {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job=new Job(conf,"HBase dummy dump");
        job.setJarByClass(TestMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class); 
        job.setMapperClass(TestMapper.class);
        TableMapReduceUtil.initTableReducerJob("Dummy", null, job);
        //job.setOutputKeyClass(NullWritable.class);
        //job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        //job.setOutputKeyClass(Text.class);
        //job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        //HFileOutputFormat.configureIncrementalLoad(job, hTable);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

映射器类:

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class TestMapper extends Mapper <LongWritable, Text, Text, Put>{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

        String line=value.toString();
        String[] l=line.split("\\s+");
        for(int i=1;i<l.length;i++)
        {
            Put HPut = new Put(l[0].getBytes());
            HPut.add("backward".getBytes(),"first".getBytes(),l[i].getBytes());
            context.write(new Text(l[0]),HPut);
        }
    }
}

请告诉我哪里出错了。

最佳答案

您的问题是您的写入会自动批处理,并且在作业结束时(当表关闭时)被刷新,可能导致每个 put 操作具有完全相同的时间戳,并且它们基本上是覆盖(编写与另一个版本具有相同时间戳的版本会覆盖该版本,而不是插入新版本)。

解决该问题的第一种方法可能是自己提供时间戳 Put HPut = new Put(l[0].getBytes(), System.currentTimeMillis());但您可能会遇到同样的问题,因为操作速度太快,以至于很多 put 都会具有相同的时间戳。

这就是我要克服这个问题的方法:

1- 停止使用TableMapReduceUtil.initTableReducerJob支持处理 hbase 表写入的自定义 reducer 。

2-修改映射器以将每行的所有值写入上下文,以便将它们分组为一个可迭代对象并传递给 reducer (即: abc, xyz kkk qwe asd anf rrb )

3-实现我自己的 reducer ,其工作方式有点像这样伪代码:

Define myHTable
setup() {
  Instantiate myHtable
  Disable myHtable autoflush to prevent puts from being automatically flushed
  Set myHtable write buffer to at least 2MB
}
reduce(rowkey, results) {
  baseTimestamp = current time in milliseconds
  Iterate results {
     Instantiate put with rowkey ++baseTimestamp
     Add result to put
     Send put to myHTable
  }
}
cleanup() {
  Flush commits for myHTable
  Close myHTable
}

这样,每个版本之间总是有 1ms,你唯一需要注意的是,如果你有大量版本并多次运行同一个作业,新作业的时间戳可能会重叠前一个作业的时间戳,如果您预计版本少于 30k,则不必担心,因为每个作业与下一个作业之间至少相隔 30 秒...

无论如何,请注意,不建议拥有超过一百个版本( http://hbase.apache.org/book.html#versions ),如果您需要更多版本,那么采用高方法(包含键+时间戳的复合行键)会更明智,而无需版本全部。

抱歉,格式很奇怪,这是让伪代码很好地显示的唯一方法。

关于java - 在 HBase 单元中保存多个版本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28253946/

相关文章:

java - 如果 Activity/UI 空闲一段时间(可能很长),UI 线程会做什么

java - 如何按第一列对 excel 文件进行排序? java

python - 如何将目录上传到HDFS

rest - 对 Yarn 的 REST API 返回的结果进行排序

unix - hive 和UNIX脚本

hadoop - 如何从Mapreduce中的Excel文件读取?

java - 将具有可变长度记录的文件转换为Java中的固定长度记录

java - 为 Windows 更改 JButton 的禁用前景(字体)颜色

hadoop:如何增加失败任务的限制

javascript - Mongodb 对每个文档数组中的所有相同字段求和 - Map-Reduce