python - 当使用 hbase 作为数据源时,spark 是否利用 hbase 键的排序顺序

标签 python hadoop mapreduce apache-spark hbase

我将时间序列数据存储在HBase中。 rowkey由user_idtimestamp组成,像这样:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

    "userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

现在我需要执行每个用户的分析。这是 hbase_rdd 的初始化(来自 here )

sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf)

自然的类似 mapreduce 的处理方式是:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user's data

在执行第一个映射(将时间戳从键转换为值)时,了解当前用户的时间序列数据何时完成是至关重要的,因此可以启动 groupByKey 转换。因此我们不需要映射所有表并存储所有临时数据。这是可能的,因为 hbase 按排序顺序存储行键。

使用 hadoop 流式处理可以这样完成:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

问题是:如何在 Spark 中利用 hbase 键的排序顺序?

最佳答案

我不是很熟悉你从 HBase 中提取数据的方式所获得的保证,但如果我理解正确,我可以用普通的旧 Spark 来回答。

你有一些RDD[X]。据 Spark 所知,RDD 中的 X 是完全无序的。但是你有一些外部知识,你可以保证数据实际上是按 X 的某个字段分组的(甚至可能按另一个字段排序)。

在这种情况下,您可以使用 mapPartitions 来完成与使用 hadoop 流式传输所做的几乎相同的事情。这使您可以遍历一个分区中的所有记录,因此您可以查找具有相同键的记录 block 。

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  //itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
      // same user as before -- add the data to our list
      currentUserData += x
      None
    } else {
      // its a new user -- return all the data for the old user, and make
      // another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle.  Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

我知道您想使用 python —— 抱歉,我认为如果我用 Scala 编写,我更有可能得到正确的示例。而且我认为类型注释使含义更清楚,但这可能是 Scala 的偏见......:)。无论如何,希望你能理解发生了什么并翻译它。 (不要太担心 flatMap & Some & None,如果你理解这个想法可能不重要......)

关于python - 当使用 hbase 作为数据源时,spark 是否利用 hbase 键的排序顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29493472/

相关文章:

Python 3.4 - 列表元素可以进行 str() 而不是 int() 转换

python - openCV的特征脸

c# - 如何从 C# 代码中获取列表到 python 中的列表

asp.net-mvc-4 - 在ASP.NET MVC中收集并显示Hadoop MapReduce结果?

hadoop - 表中不同值类型的Hive CSV Serde格式

java - 有没有一种方法可以限制我的mapreduce(JAVA)作业产生的记录量?

python - 可以将任何算法实现到 hadoop 流式 mapreduce 工作中吗?

performance - Hadoop 等开源计算平台的效率如何?

python - 使用类方法作为 celery 任务

python - Hadoop 2.7 : MapReduce task's total time using streaming API