python - 使用 python 将 Spark 2 与 HBase 集成连接的 jars

标签 python pyspark hbase apache-spark-2.0

我在 Spark 2 中使用 pyspark,是否有任何 jars 将 HBasepyspark 连接可用。

请帮助我编写示例代码。

最佳答案

作为之前答案的替代方案:

我正在使用 hortonworks Spark hbase 连接器。您可以在 github 上找到它。他们在 spark summit 上引入了连接器。演讲结束时还有一个带有一些示例代码的现场演示。希望有帮助。

--- 编辑 ---

示例中的代码是用 scala 编写的,但连接器也适用于 pyspark。这是一个写/读示例:

使用连接器启动 PySpark-Shell(也许您必须使用另一个版本的包进行设置 - 查看 github 上的介绍和对话)。

pyspark --master yarn --packages com.hortonworks.shc:shc-core:1.1.0.2.6.5.2-8 --repositories http://nexus-private.hortonworks.com/nexus/content/groups/public/

创建 sql-context 并定义数据源

sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'

接下来我们要定义一个目录,它是连接器可读的表结构。当您有权访问 hbase 的默认命名空间时,将 YourNameSpace 替换为 default。如果没有,请插入可访问的命名空间。

catalog = ''.join("""{
    "table":{"namespace":"YourNameSpace", "name":"TestTable", "tableCoder":"PrimitiveType"},
    "rowkey":"key",
    "columns":{
        "ID":{"cf":"rowkey", "col":"key", "type":"string"},
        "DATA":{"cf":"data", "col":"", "type":"string"}
        }
    }""".split())

要创建一个 hbase 表并在其中写入一些内容,我们创建一个包含一些数据的合适数据框......

df = sc.parallelize([('1', 'Moin'), ('2', 'Hello'), ('3', 'Hallo')]).toDF(schema=['ID', 'DATA'])

...并将其保存到 hbase。

df.write.options(catalog=catalog, newtable = 5).format(data_source_format).save()

现在我们可以从 hbase 表中读取内容并将其保存到变量中:

df_read = sqlc.read.options(catalog=catalog).format(data_source_format).load()

检查:

>>> df_read.show()
+---+-----+                                                                     
| ID| DATA|
+---+-----+
|  1| Moin|
|  2|Hello|
|  3|Hallo|
+---+-----+

- 在 HDP 2.5 上使用 PySpark 2 进行测试

关于python - 使用 python 将 Spark 2 与 HBase 集成连接的 jars,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48220001/

相关文章:

python - 删除具有最低值的字典条目

python - 从发送到 spark-submit 的外部 __main__ 文件修改 SparkContext

python - 将 List 元素作为列添加到现有 pyspark 数据帧

configuration - HBaseConfiguration 不读取我在 ${HBASE_HOME}/conf/hbase-site.xml 中设置的值

csv - 使用Apache Pig将数据加载到Hbase表时,如何排除csv或文本文件中某行中没有数据(仅空白)的列?

python - 用数据框中组的平均值替换列值

python - setattr() 和 object.__setattr__() 有什么区别?

apache-spark - sc.parallelize 和 sc.textFile 有什么区别?

hadoop - 带有 syslogs source 和 hbase sink 的 flum agent

python - 没有 __init__ 方法的类初始化