apache-spark - 使用 Pyspark 与 Hbase 交互的最佳方式是什么

标签 apache-spark hadoop pyspark apache-spark-sql hbase

我正在使用 pyspark [spark2.3.1] 和 Hbase1.2.1,我想知道使用 pyspark 访问 Hbase 的最佳方式是什么?

我做了一些初始级别的搜索,发现几乎没有可用的选项,例如使用 shc-core:1.1.1-2.1-s_2.11.jar 这可以实现,但无论我在哪里尝试寻找一些示例,大多数地方的代码都是用 Scala 编写的,或者示例也是基于 Scala 的。我尝试在 pyspark 中实现基本代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext

def main():
    sc = SparkContext()
    sqlc = SQLContext(sc)
    data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"firsttable"},
        "rowkey":"key",
        "columns":{
            "firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
            "secondcol":{"cf":"d", "col":"colname", "type":"string"}
        }
    }""".split())
    df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
    df.select("secondcol").show()

# entry point for PySpark application
if __name__ == '__main__':
    main()

并使用以下命令运行它:

spark-submit  --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11  --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

它返回给我空白输出:

+---------+
|secondcol|
+---------+
+---------+

我不知道我做错了什么?也不确定这样做的最佳方法是什么??

如有任何引用,我们将不胜感激。

问候

最佳答案

最后,使用SHC,我可以使用 pyspark 代码连接到 HBase-1.2.1 和 Spark-2.3.1。以下是我的工作:

  • 我的所有 hadoop [namenode、datanode、nodemanager、resourcemanager] 和 hbase [Hmaster、HRegionServer、HQuorumPeer] 守护进程均已在我的 EC2 实例上启动并运行。

  • 我将 emp.csv 文件放置在 hdfs 位置/test/emp.csv,数据为:

key,empId,empName,empWeight
1,"E007","Bhupesh",115.10
2,"E008","Chauhan",110.23
3,"E009","Prithvi",90.0
4,"E0010","Raj",80.0
5,"E0011","Chauhan",100.0
  • 我使用以下代码行创建了readwriteHBase.py文件[用于从HDFS读取emp.csv文件,然后首先在HBase中创建tblEmployee,将数据推送到tblEmployee,然后再次读取来自同一个表的一些数据并将其显示在控制台上]:

    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
    
        dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
        writeCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"},
                      "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
                    }
                  }""".split())
    
        writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
        print("csv file read", writeDF.show())
        writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
        print("csv file written to HBase")
    
        readCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"}
                    }
                  }""".split())
    
        print("going to read data from Hbase table")
        readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
        print("data read from HBase table")
        readDF.select("empId", "empName").show()
        readDF.show()
    
    # entry point for PySpark application
    if __name__ == '__main__':
        main()
    
  • 使用以下命令在虚拟机控制台上运行此脚本:

    spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
    
  • 中间结果:读取 CSV 文件后:

    +---+-----+-------+---------+
    |key|empId|empName|empWeight|
    +---+-----+-------+---------+
    |  1| E007|Bhupesh|    115.1|
    |  2| E008|Chauhan|   110.23|
    |  3| E009|Prithvi|     90.0|
    |  4|E0010|    Raj|     80.0|
    |  5|E0011|Chauhan|    100.0|
    +---+-----+-------+---------+
    
  • 最终输出:从 HBase 表读取数据后:

    +-----+-------+
    |empId|empName|
    +-----+-------+
    | E007|Bhupesh|
    | E008|Chauhan|
    | E009|Prithvi|
    |E0010|    Raj|
    |E0011|Chauhan|
    +-----+-------+
    

注意:在创建 Hbase 表并将数据插入 HBase 表时,它预计 NumberOfRegions 应大于 3,因此我添加了 options(catalog=writeCatalog, newtable=5)向HBase添加数据时

关于apache-spark - 使用 Pyspark 与 Hbase 交互的最佳方式是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54826218/

相关文章:

java - 如何使用 livy 提交带有关键字参数的 jar?

java - 无法找到 LoginModule 类 : com. sun.security.auth.module.UnixLoginModule

java - HADOOP - 作为映射器输出生成的输出文件数

python - 如何获取和比较pyspark中两个数据框中相似列的所有值的数据类型

apache-spark - 如何在spark sql中嵌套collect_list?

python - 跨组的 Pyspark 示例数据框

mongodb - 在 Spark 中创建分层 JSON

读取 hadoop SequenceFile 时出现 java.lang.NoClassDefFoundError

python - 无法在 PyCharm 上安装 pyspark

Java Spark 数据编码