hadoop - Spark Streaming - HBase 批量加载

标签 hadoop hbase pyspark

我目前正在使用 Python 将 CSV 数据批量加载到 HBase 表中,目前我在使用 saveAsNewAPIHadoopFile 编写适当的 HFile 时遇到了问题

我的代码目前如下所示:

def csv_to_key_value(row):
    cols = row.split(",")
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
              (cols[0], [cols[0], "f2", "c2", cols[2]]),
              (cols[0], [cols[0], "f3", "c3", cols[3]]))
    return result

def bulk_load(rdd):
    conf = {#Ommitted to simplify}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)
    if not load_rdd.isEmpty():
        load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                        "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                        conf=conf,
                                        keyConverter=keyConv,
                                        valueConverter=valueConv)
    else:
        print("Nothing to process")

当我运行这段代码时,出现以下错误:

java.io.IOException:添加了一个词法上不大于以前的键。当前单元格 = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0,lastCell =/f1:c1/1453891407212/Minimum/vlen=1/seqid=0 在 org.apache.hadoop.hbase.io。 hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)

由于错误表明是键的问题,所以我从我的 RDD 中抓取了元素,它们如下(为了便于阅读而格式化)

[(u'1', [u'1', 'f1', 'c1', u'A']),
 (u'1', [u'1', 'f2', 'c2', u'1A']),
 (u'1', [u'1', 'f3', 'c3', u'10']),
 (u'2', [u'2', 'f1', 'c1', u'B']),
 (u'2', [u'2', 'f2', 'c2', u'2B']),
 (u'2', [u'2', 'f3', 'c3', u'9']),

. . .

 (u'9', [u'9', 'f1', 'c1', u'I']),
 (u'9', [u'9', 'f2', 'c2', u'3C']),
 (u'9', [u'9', 'f3', 'c3', u'2']),
 (u'10', [u'10', 'f1', 'c1', u'J']),
 (u'10', [u'10', 'f2', 'c2', u'1A']),
 (u'10', [u'10', 'f3', 'c3', u'1'])]

这与我的 CSV 完全匹配,顺序正确。据我了解,在 HBase 中,一个键由 {row, family, timestamp} 定义。行和族的组合对于我数据中的所有条目来说都是唯一且单调递增的,而且我无法控制时间戳(这是我能想象到的唯一问题)

谁能告诉我如何避免/预防此类问题?

最佳答案

嗯,这只是我的一个愚蠢错误,我觉得有点愚蠢。按字典顺序,顺序应为 1、10、2、3 ... 8、9。在加载前保证正确排序的最简单方法是:

rdd.sortByKey(true);

我希望我至少可以让一个人摆脱我的头痛。

关于hadoop - Spark Streaming - HBase 批量加载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35035493/

相关文章:

Hadoop 和分析?

hadoop - 使用mapreduce处理文件

pandas - 在 PySpark 的 pandas_udf 中使用外部库

python - 使用 Pyspark 并行化 HTTP 请求

database-design - Hadoop Hbase : Spreading column families across tables or not

hadoop - 在 Hadoop 中将多个文件合并为一个文件

hadoop - 从Java客户端运行LoadIncrementalHFiles

java - 如何将字节数组写入和读取到 DataInput 和 DataOutput Stream

java - 无法使用PreparedStatement在Phoenix中插入行

python-3.x - 我们可以用 Sparktrials 保存 Hyperopt 试验的结果吗