hadoop - 在 hdfs 中聚合两个数据集并将其导出/移动到 hbase

标签 hadoop

我是 hadoop 生态系统的新手。我设法将 2 个不同的数据集从 rdbms 导入到 hdfs。现在我需要聚合数据集并将其导出/移动到基础。最好的方法是什么?

我在 hbase 中已经有第三种类型的数据集

感谢帮助

最佳答案

您可以通过 map reduce 代码轻松地做到这一点。
关于您的问题:

  • Mapper 接收数据拆分并返回一对 key ,
    设置
  • Reducer 接收 Mapper 的输出并生成一个
    一对

  • 通常,您的 Reducer 任务将写入结果(到文件系统或 HBase)。

    示例示例
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    import java.util.Calendar;
    
    public class MapReduceExample
    {
    
        public MapReduceExample() {
    
        }
    
        static class MyMapper extends TableMapper<LongWritable, LongWritable>
        {
            private LongWritable ONE = new LongWritable( 1 );
    
            public MyMapper() {
            }
    
            @Override
            protected void map( ImmutableBytesWritable rowkey, Result columns, Context context ) throws IOException, InterruptedException
    
            {
    
                // Get the timestamp from the row key
                long timestamp = ExampleSetup.getTimestampFromRowKey(rowkey.get());
    
                // Get hour of day
                Calendar calendar = Calendar.getInstance();
                calendar.setTimeInMillis( timestamp );
                int hourOfDay = calendar.get( Calendar.HOUR_OF_DAY );
    
                // Output the current hour of day and a count of 1
                context.write( new LongWritable( hourOfDay ), ONE );
            }
        }
    
        static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable>
    
        {
            public MyReducer() {
            }
    
            @Override
            protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
    
            {
                // Add up all of the page views for this hour
                long sum = 0;
                for( LongWritable count : values )
                {
                    sum += count.get();
                }
    
                // Write out the current hour and the sum
                context.write( key, new LongWritable( sum ) );
            }
        }
    
        public static void main( String[] args )
        {
            try
            {
                // Setup Hadoop
                Configuration conf = HBaseConfiguration.create();
                Job job = Job.getInstance(conf, "PageViewCounts");
                job.setJarByClass( MapReduceExample.class );
    
                // Create a scan
                Scan scan = new Scan();
    
                // Configure the Map process to use HBase
                TableMapReduceUtil.initTableMapperJob(
    
                        "PageViews",                    // The name of the table
                        scan,                           // The scan to execute against the table
                        MyMapper.class,                 // The Mapper class
                        LongWritable.class,             // The Mapper output key class
                        LongWritable.class,             // The Mapper output value class
                        job );                          // The Hadoop job
    
                // Configure the reducer process
                job.setReducerClass( MyReducer.class );
                job.setCombinerClass( MyReducer.class );
    
                // Setup the output - we'll write to the file system: HOUR_OF_DAY   PAGE_VIEW_COUNT
                job.setOutputKeyClass( LongWritable.class );
                job.setOutputValueClass( LongWritable.class );
                job.setOutputFormatClass( TextOutputFormat.class );
    
                // We'll run just one reduce task, but we could run multiple
                job.setNumReduceTasks( 1 );
    
                // Write the results to a file in the output directory
                FileOutputFormat.setOutputPath( job, new Path( "output" ) );
    
                // Execute the job
                System.exit( job.waitForCompletion( true ) ? 0 : 1 );
    
            }
            catch( Exception e )
            {
                e.printStackTrace();
            }
        }
    }
    

    更多信息
    http://hbase.apache.org/book.html#mapreduce.example
    http://www.informit.com/articles/article.aspx?p=2262143&seqNum=2

    关于hadoop - 在 hdfs 中聚合两个数据集并将其导出/移动到 hbase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32888741/

    相关文章:

    java - Impala单个插入语句创建多个文件

    csv - 加载管道分隔的CSV数据,其 hive 中的一列中带有“(双引号)

    apache-spark - Spark saveAsTable 的位置位于 s3 存储桶的根本原因 NullPointerException

    hadoop - 我们可以在 hadoop 的 reduce 函数中使用一个文件吗?

    hadoop - 从客户端应用程序写入的 Google Dataproc 将集群的内部 IP 用于数据节点

    hadoop - Hadoop FsImage中的DSQUOTA和NSQUOTA列是什么意思?

    hadoop - Pig Latin 中的 Apache Pig 查询所用的 Cpu 时间

    hadoop - Hive表/数据库设置在哪里?

    hadoop - 提供一种通过UDF在Hive中使用计数器的方法

    java - 当我在hadoop 1.2.1中使用时,我想从父类HashPatitioner构建一个新的MyPatitioner