hadoop - 我想扫描大量数据(基于范围的查询),在写入数据时我可以做哪些优化以使扫描变得更快?

标签 hadoop hbase

我在 hbase 中有 亿 行 我想一次扫描 百万 行。我可以采用哪些最佳优化技术来使扫描尽可能快。

最佳答案

我们有类似的问题,我们需要按键扫描百万行,为此我们使用了 map reduce 技术。对于这个没有标准的解决方案,所以我们编写了一个扩展 InputFormat<ImmutableBytesWritable, Result> 的自定义输入格式。 .有一个镜头描述了我们是如何做到这一点的。

首先,您需要创建一个拆分,以便 key 进入包含它的区域所在的机器:

public List<InputSplit> getSplits(JobContext context) throws IOException {
    context.getConfiguration();

    //read key for scan
    byte[][] filterKeys = readFilterKeys(context);

    if (table == null) {
        throw new IOException("No table was provided.");
    }

    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
    if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
        throw new IOException("Expecting at least one region.");
    }

    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
    for (int i = 0; i < keys.getFirst().length; i++) {
        //get key for current region 
        //it should lying between start and end key of region 
        byte[][] regionKeys =
                getRegionKeys(keys.getFirst()[i], keys.getSecond()[i],filterKeys);
        if (regionKeys == null) {
            continue;
        }
        String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
                getServerAddress().getHostname();
        //create a split for region
        InputSplit split = new MultiplyValueSplit(table.getTableName(),
                regionKeys, regionLocation);
        splits.add(split);

    }
    return splits;
}

“MultiplyValueSplit”类包含有关键和表的信息

public class MultiplyValueSplit extends InputSplit
    implements Writable, Comparable<MultiplyValueSplit> {

    private byte[] tableName;
    private byte[][] keys;
    private String regionLocation;
}

在方法中createRecordReader在输入格式类中有一个“MultiplyValueReader”,其中包含如何创建从表中读取值的逻辑。

@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    HTable table = this.getHTable();
    if (table == null) {
        throw new IOException("Cannot create a record reader because of a" +
                " previous error. Please look at the previous logs lines from" +
                " the task's full log for more details.");
    }

    MultiplyValueSplit mSplit = (MultiplyValueSplit) split;
    MultiplyValuesReader mvr = new MultiplyValuesReader();

    mvr.setKeys(mSplit.getKeys());
    mvr.setHTable(table);
    mvr.init();

    return mvr;
}

“MultiplyValuesReader”类包含有关如何从 HTable 读取数据的逻辑

public class MultiplyValuesReader 
        extends RecordReader<ImmutableBytesWritable, Result> {
    .......

    @Override
    public ImmutableBytesWritable getCurrentKey() {
        return key;
    }

    @Override
    public Result getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (this.results == null) {
            return false;
        }

        while (this.results != null) {
            if (resultCurrentKey >= results.length) {
                this.results = getNextResults();
                continue;
            }

            if (key == null) key = new ImmutableBytesWritable();
            value = results[resultCurrentKey];
            resultCurrentKey++;

            if (value != null && value.size() > 0) {
                key.set(value.getRow());
                return true;
            }

        }
        return false;
    }

    public float getProgress() {
        // Depends on the total number of tuples
        return (keys.length > 0 ? ((float) currentKey) / keys.length : 0.0f);
    }

    private Result[] getNextResults() throws IOException {
        if (currentKey <= keys.length) {
            return null;
        }

        //using batch for faster scan
        ArrayList<Get> batch = new ArrayList<Get>(BATCH_SIZE);
        for (int i = currentKey; 
             i < Math.min(currentKey + BATCH_SIZE, keys.length); i++) {
            batch.add(new Get(keys[i]));
        }

        currentKey = Math.min(currentKey + BATCH_SIZE, keys.length);
        resultCurrentKey = 0;
        return htable.get(batch);
    }

}

有关更多详细信息,您可以查看类的源代码TableInputFormat , TableInputFormatBase , TableSplitTableRecordReader .

关于hadoop - 我想扫描大量数据(基于范围的查询),在写入数据时我可以做哪些优化以使扫描变得更快?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8427348/

相关文章:

Hadoop MapReduce : default number of mappers

hadoop - Hbase排序效率

hadoop - 极大 MAX_FILESIZE 的缺点

hadoop - HBase - hbase :metadata holds info about non existing RegionServer ID - "Master startup cannot progress, in holding-pattern until region onlined."

scala - 通过 Spark 访问 HBase 表

python - PySpark(Python 2.7): How to flatten values after reduce

hadoop - 从 groovy 脚本执行 Hadoop 文件系统命令时没有错误但行为奇怪

scala - 什么版本库spark支持SparkSession

sql - 需要大量时间的生产 Hadoop 查询

nosql - 单调递增键的 HBase 行键设计