scala - HBASE SPARK 带过滤器的查询,无需加载所有 hbase

标签 scala apache-spark apache-spark-sql hbase

我必须查询 HBASE,然后使用 Spark 和 scala 处理数据。 我的问题是,通过我的解决方案,我获取 HBASE 表的所有数据,然后进行过滤,这不是一种有效的方法,因为它占用了太多内存。所以我想直接做过滤器,我该怎么做?

def HbaseSparkQuery(table: String, gatewayINPUT: String, sparkContext: SparkContext): DataFrame = {

    val sqlContext = new SQLContext(sparkContext)

    import sqlContext.implicits._

    val conf = HBaseConfiguration.create()

    val tableName = table

    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.master", "localhost:60000")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


    val DATAFRAME = hBaseRDD.map(x => {
      (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("rssi"))))

    }).toDF()
      .withColumnRenamed("_1", "GatewayIMEA")
      .withColumnRenamed("_2", "EventTime")
      .withColumnRenamed("_3", "ap")
      .withColumnRenamed("_4", "RSSI")
      .filter($"GatewayIMEA" === gatewayINPUT)

    DATAFRAME
  }

正如您在我的代码中看到的,我在创建数据帧后、加载 Hbase 数据后进行过滤..

提前感谢您的回答

最佳答案

这是我找到的解决方案

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil

object HbaseConnector {

  def main(args: Array[String]): Unit = {

//    System.setProperty("hadoop.home.dir", "/usr/local/hadoop")
    val sparkConf = new SparkConf().setAppName("CoverageAlgPipeline").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)

    import sqlContext.implicits._

    val spark = org.apache.spark.sql.SparkSession.builder
      .master("local")
      .appName("Coverage Algorithm")
      .getOrCreate

    val GatewayIMEA = "123"

    val TABLE_NAME = "TABLE"

    val conf = HBaseConfiguration.create()

    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.master", "localhost:60000")
    conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME)

    val connection = ConnectionFactory.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(TABLE_NAME))
    val scan = new Scan

    val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))
    scan.setFilter(GatewayIDFilter)

    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))

    val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


    val DATAFRAME = hBaseRDD.map(x => {
      (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("Measure"))))

    }).toDF()
      .withColumnRenamed("_1", "GatewayIMEA")
      .withColumnRenamed("_2", "EventTime")
      .withColumnRenamed("_3", "ap")
      .withColumnRenamed("_4", "measure")


    DATAFRAME.show()

  }

}

所做的是设置输入表,设置过滤器,使用过滤器进行扫描并将扫描结果扫描到 RDD,然后将 RDD 转换为数据帧(可选)

执行多个过滤器:

val timestampFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("eventTime"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(String.valueOf(dateOfDayTimestamp)))
val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))

val filters = new FilterList(GatewayIDFilter, timestampFilter)
scan.setFilter(filters)

关于scala - HBASE SPARK 带过滤器的查询,无需加载所有 hbase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53050639/

相关文章:

java - 使用 Apache Spark 和 Java 增量加载 parquet 文件中的数据

java - Apache Spark 连接具有相同列名(具有不同数据)的表的数据集

java - 如何为 SBT 设置 Java 版本

scala - 将 .csv 文件读入 scala,同时保留部分结构

scala - Spark : How to efficiently have intersections preserving duplicates (in Scala)?

apache-spark - 获取所有 Apache Spark 执行器日志

apache-spark - 如何合并 Spark SQL 查询的结果以避免出现大量小文件/避免空文件

apache-spark - 在 Spark 中使用窗口函数

scala - 为什么 sbt 程序集失败并显示 "Not a valid command: assembly"?

scala - 重写字符串修改功能更实用