python - 向Hbase插入数据的最快方法是什么?

标签 python hadoop hbase

我尝试在最短的时间内将数据插入 Hbase。我按照底部显示的方式尝试,但出现如下所示的错误。有谁知道出了什么问题,我该如何解决?也许 hbase org.apache.hadoop.hbase.mapreduce.ImportTsv更有效率?或者也许我应该使用 PySpark 甚至其他工具?什么给了我最好的表现?如果需要任何其他信息,请告诉我。先感谢您。

错误

Connect to HBase. table name: rfic, batch size: 1000
Connected to file. name: /path/to/hbase/logs2.csv
Traceback (most recent call last):
  File "insert-data2.py", line 87, in <module>
    batch.send()
  File "/usr/local/lib/python3.4/dist-packages/happybase/batch.py", line 60, in send
    self._table.connection.client.mutateRows(self._table.name, bms, {})
  File "/usr/local/lib/python3.4/dist-packages/thriftpy/thrift.py", line 198, in _req
    return self._recv(_api)
  File "/usr/local/lib/python3.4/dist-packages/thriftpy/thrift.py", line 210, in _recv
    fname, mtype, rseqid = self._iprot.read_message_begin()
  File "thriftpy/protocol/cybin/cybin.pyx", line 439, in cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6470)
cybin.ProtocolError: No protocol version header

以下是我的源代码,但如果您有任何其他可行的解决方案,我将不胜感激。

插入数据2.py
import csv
import happybase
import time

batch_size = 1000
host = "0.0.0.0"
file_path = "/path/to/hbase/logs2.csv"
namespace = "sample_data"
row_count = 0
start_time = time.time()
table_name = "rfic"


def connect_to_hbase():
    """ Connect to HBase server.

    This will use the host, namespace, table name, and batch size as defined in
    the global variables above.
    """
    conn = happybase.Connection(host = host,
        table_prefix = namespace,
        table_prefix_separator = ":")
    conn.open()
    table = conn.table(table_name)
    batch = table.batch(batch_size = batch_size)
    return conn, batch


def insert_row(batch, row):
    """ Insert a row into HBase.

    Write the row to the batch. When the batch size is reached, rows will be
    sent to the database.

    Rows have the following schema:
        [ id, keyword, subcategory, type, township, city, zip, council_district,
          opened, closed, status, origin, location ]
    """
    batch.put(row[0], { "data:log": row[1]})


def read_csv():
    csvfile = open(file_path, "r")
    csvreader = csv.reader(csvfile)
    return csvreader, csvfile


# After everything has been defined, run the script.
conn, batch = connect_to_hbase()
print "Connect to HBase. table name: %s, batch size: %i" % (table_name, batch_size)
csvreader, csvfile = read_csv()
print "Connected to file. name: %s" % (file_path)

try:
    # Loop through the rows. The first row contains column headers, so skip that
    # row. Insert all remaining rows into the database.
    for row in csvreader:
        row_count += 1
        if row_count == 1:
            pass
        else:
            insert_row(batch, row)

    # If there are any leftover rows in the batch, send them now.
    batch.send()
finally:
    # No matter what happens, close the file handle.
    csvfile.close()
    conn.close()

duration = time.time() - start_time
print "Done. row count: %i, duration: %.3f s" % (row_count, duration)

最佳答案

当 Thrift Server 未运行时会发生这种情况。您需要使用 hbase-daemon.sh 脚本在 HBase 集群上启动 Thrift:

 /<hbase_home>/bin/hbase-daemon.sh start thrift

HBase Thrift 接口(interface)允许其他语言通过连接到 Thrift 服务器来访问 HBase。它就像 HBase 的网关一样工作。

通过happybase 库连接时,请使用运行thrift 服务器的端口。

关于python - 向Hbase插入数据的最快方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52988175/

相关文章:

python - 如何在不重新加载页面的情况下刷新 Selenium Webdriver DOM 数据?

nosql - Hbase性能

java - 无法从Java客户端创建HBase模式

indexing - HBase 是如何管理其索引的?

python - 如何使用Python从辅助Outlook电子邮件中下载附件?

python - 在 python 中。当字典位于类中时,如何让用户更改字典值?

python - 在 tensorflow 中使用 gabor 过滤器,或任何其他过滤器而不是默认过滤器

hadoop - 级联2.1.6 + hadoop 1.0.4错误

bash - Hadoop作业配置文件规范

hadoop - 在 hadoop 多集群中更改 ssh 默认端口