java - 如何使用 MapReduce 将 CSV 导入 HBASE 表

标签 java csv hadoop mapreduce hbase

您好,我是 hadoop 的新手,我正在尝试使用 MapReduce 将 csv 表导入到 Hbase。

我正在使用 hadoop 1.2.1 和 hbase 1.1.1

我有以下格式的数据:

Wban Number, YearMonthDay, Time, Hourly Precip

03011,20060301,0050,0

03011,20060301,0150,0

我写了下面的批量加载代码

public class BulkLoadDriver extends Configured implements Tool{

public static void main(String [] args) throws Exception{


    int result= ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);
}

public static enum COUNTER_TEST{FILE_FOUND, FILE_NOT_FOUND};
public String tableName="hpd_table";// name of the table to be inserted in hbase

@Override
public int run(String[] args) throws Exception {

    //Configuration conf= this.getConf();

    Configuration conf = HBaseConfiguration.create();
    Job job= new Job(conf,"BulkLoad"); 
    job.setJarByClass(getClass());

    job.setMapperClass(bulkMapper.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);


    TableMapReduceUtil.initTableReducerJob(tableName, null, job);   //for HBase table
    job.setNumReduceTasks(0);
    return (job.waitForCompletion(true)?0:1);


}
private static class bulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
    //static class bulkMapper extends TableMapper<ImmutableBytesWritable, Put> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String [] val= value.toString().split(",");


        // store the split values in the bytes format so that they can be added to the PUT object
        byte[] wban=Bytes.toBytes(val[0]);
        byte[] ymd= Bytes.toBytes(val[1]);
        byte[] tym=Bytes.toBytes(val[2]);
        byte[] hPrec=Bytes.toBytes(val[3]);

        Put put=new Put(wban);
        put.add(ymd, tym, hPrec);

        System.out.println(wban);
        context.write(new ImmutableBytesWritable(wban), put);

        context.getCounter(COUNTER_TEST.FILE_FOUND).increment(1);

    }

}

我为此创建了一个 jar 并在终端中运行:

hadoop jar ~/hadoop-1.2.1/MRData/bulkLoad.jar bulkLoad.BulkLoadDriver/MR/input/200603hpd.txt hpd_table

但我得到的输出是数百行以下类型的行: attempt_201509012322_0001_m_000000_0: [B@2d22bfc8 attempt_201509012322_0001_m_000000_0:[B@445cfa9e

我不确定它们是什么意思以及如何执行此批量上传。请帮忙。

提前致谢。

最佳答案

有几种方法可以将数据导入到 HBase 中。请查看以下链接:

http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_hbase_import.html

HBase 批量加载:

  1. CSV 格式的数据文件

  2. 将您的数据处理成 HFile 格式。参见 http://hbase.apache.org/book/hfile_format.html有关 HFile 格式的详细信息。通常您使用 MapReduce 作业进行转换,并且您经常需要自己编写 Mapper,因为您的数据是唯一的。作业必须发出行键作为键,以及键值、放置或删除作为值。 Reducer 由 HBase 处理;使用 HFileOutputFormat.configureIncrementalLoad() 配置它并执行以下操作:

    • 检查表以配置总顺序分区器
    • 将分区文件上传到集群并将其添加到 分布式缓存
    • 设置reduce任务的数量以匹配当前的任务数量 地区
    • 设置输出键/值类以匹配 HFileOutputFormat 要求
    • 设置 Reducer 以执行适当的排序 (KeyValueSortReducer 或 PutSortReducer)
  3. 在输出文件夹中为每个区域创建一个 HFile。输入数据几乎完全重写,因此您需要的可用磁盘空间至少是原始数据集大小的两倍。例如,对于 mysqldump 的 100 GB 输出,您应该在 HDFS 中至少有 200 GB 的可用磁盘空间。您可以在过程结束时删除原始输入文件。

  4. 将文件加载到 HBase。使用 LoadIncrementalHFiles 命令(通常称为 completebulkload 工具),向其传递一个在 HDFS 中定位文件的 URL。每个文件都加载到该区域的 RegionServer 上的相关区域中。您可以通过传递 --versions= N 选项来限制加载的版本数,其中 N 是要包含的最大版本数,从最新到最旧(最大时间戳到最小时间戳)。 如果在创建文件后分割了一个区域,该工具会根据新的边界自动分割 HFile。此过程效率低下,因此如果您的表正在被其他进程写入,您应该在转换步骤完成后立即加载。

关于java - 如何使用 MapReduce 将 CSV 导入 HBASE 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32326465/

相关文章:

java - 使用 open csv 时从 csv 中删除双引号

hadoop - 随机选择HIVE中的行,其中一半必须满足条件

hadoop - Sort 在 MapReduce 阶段用在什么地方,为什么?

hadoop - 用于 Hadoop Streaming 的 Go 客户端

java - 欧几里得算法是如何工作的?

java - 使用 PDFBox 比较两个 PDF 文件文本失败,即使两个文件具有相同的文本

java - 连接池 : is it appropriate

Java获取x和y坐标

python - 如何在 pandas df 中插入第二个标题行以进行 csv 写入

powershell - 使用 PowerShell 过滤具有多个条件的 CSV 文件