scala - 将 Spark 数据帧插入 hbase

标签 scala apache-spark dataframe hbase rdd

我有一个数据框,我想将它插入到 hbase 中。我关注这个documenation .

这是我的数据框的样子:

 --------------------
|id | name | address |
|--------------------|
|23 |marry |france   |
|--------------------|
|87 |zied  |italie   |
 --------------------

我使用这段代码创建了一个 hbase 表:

val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
          print("-----------------------------------------------------------------------------------------------------------")
          val tableDesc = new HTableDescriptor(tableName)
          tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
          admin.createTable(tableDesc)
        }else{
          print("Table already exists!!--------------------------------------------------------------------------------------")
        }

现在我该如何将这个数据框插入到 hbase 中?

在另一个示例中,我使用以下代码成功插入到 hbase 中:

val myTable = new HTable(conf, tableName)
    for (i <- 0 to 1000) {
      var p = new Put(Bytes.toBytes(""+i))
      p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
      p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
      p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
      p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
      myTable.put(p)
    }
    myTable.flushCommits()

但现在我卡住了,如何将我的数据帧的每条记录插入到我的 hbase 表中。

感谢您的时间和关注

最佳答案

另一种方法是查看 rdd.saveAsNewAPIHadoopDataset,将数据插入到 hbase 表中。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
    import spark.implicits._

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "ip's")
    config.set("hbase.zookeeper.property.clientPort","2181")
    config.set(TableInputFormat.INPUT_TABLE, "tableName")

    val newAPIJobConfiguration1 = Job.getInstance(config)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

    val hbasePuts= df.rdd.map((row: Row) => {
      val  put = new Put(Bytes.toBytes(row.getString(0)))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
      put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
      (new ImmutableBytesWritable(), put)
    })

    hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
    }

引用:https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/

关于scala - 将 Spark 数据帧插入 hbase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44111988/

相关文章:

scala - Spark Scala 中的模棱两可的模式

python - 根据DataFrame从DataFrame列表中删除DataFrame - python?

scala - 使用列名数组聚合 Spark 数据框,保留名称

scala - Spark 查询 : unclosed character literal

regex - 您如何发现 spark 数据框中列格式的异常?

scala - Spark UDF 中的类型不匹配

r - 美观必须是长度1或者与数据问题是ggplot相同

python - python pandas 中的嵌套数据框/索引

java - 在 Scala 中学习并发时出现 NullPointerException

scala - Scala 中对映射值进行求和和分组