scala - Spark RDD 按键查找

标签 scala apache-spark mapreduce hbase rdd

我有一个从 HBase 转换而来的 RDD:

val hbaseRDD: RDD[(String, Array[String])] 其中 tuple._1 是行键。数组是HBase中的值。

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]

我还有一个 SchemaRDD (id,date1,col1,col2,col3) 转换为

val refDataRDD: RDD[(String, Array[String])] 我将对其进行迭代并检查它是否存在于 hbaseRDD 中:

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]

问题是,

  • 如何检查 hbaseRDD 中是否存在键 (tuple._1)/("4929103") 并获取相应的值 (tuple._2)? - 我不能在 rdd.filter 中使用 PairRDD 的查找函数,它会抛出“scala.MatchError: null”,但它在外部有效

    val filteredRDD = rdd.filter(sqlRow => {
      val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE")
      // if found, check if date1 of hbaseRDD < sqlRow(1)
      // else if not found, retain row
      true
    })
    

    不过我不确定这是否是问题所在,因为当我将查找行切换到以下位置时我也遇到了 NPE:

    val sqlRowHbase = hbaseRDD.filter(row => {
    

    注意:我在这些行之前执行 hbaseRDD.count。并且 hbaseRDD.lookup 在 rdd.filter 之外工作正常

所以基本上,我试图在 hbaseRDD 中按键“查找”并获取行/值。加入它们有点复杂,因为两个 RDD 中的某些值可能为空。这取决于很多场景,哪些行会保留哪些数据。

最佳答案

假设您需要查找的 a_id 集合包含在 RDD 中,我认为您可以使用 leftOuterJoin 而不是迭代和查找每个值。

我在上面看到了您关于 date1 的潜在可变位置的评论。不过,我没有在下面解决它,我认为这应该在查找本身之前通过每行的某种特定映射来处理。

如果我得到正确的伪代码,你有一个 (id, date) 的 RDD 并且想通过在 hbase 中查找数据来更新它,如果在 hbase 中找到一行则更新日期对于这个 id,如果它的日期早于 refData 中的日期。对吗?

如果是这样,假设你有一些像这样的引用数据:

val refData = sc.parallelize(Array(
 ("4929103","2015-05-21 10:03:44"),
 ("4929104","2015-05-21 10:03:44")
))

还有一些来自 Hbase 的行数据:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))

然后您可以使用简单的 leftOuterJoin 将 refData 中的每个 id 查找到 hbase 中,对于找到的每一行:如有必要,更新日期:

refData
  // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
  .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})

  // update the date in refData if date from hBase is earlier
  .map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
  .collect


def chooseDate(refDate: String, rowDate: Option[String]) =  rowDate match {

  // if row not found in Hbase: keep ref date
  case None => refDate

  case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
        rowDate
    else
        refDate
}

关于scala - Spark RDD 按键查找,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30421484/

相关文章:

java - 添加静态方法以促进更清洁的单元测试 - 好的做法?

scala - 将 F 有界类型表示为抽象类型成员

scala - Option 包装一个值是一个好的模式吗?

java - Hadoop 将多个部分文件组合成单个文件

android - 为什么 sbt 报告 "No java installations was detected"并设置了 $JAVA_HOME?

scala - 尽管文件大小超出了执行器内存,但如何使用一个分区将数据帧写入 csv 文件

azure - 如何在 ADF 中运行 Spark 作业?

apache-spark - Spark 能引爆一 jar jar 吗

hadoop - Hadoop 如何决定在桶/节点之间分配?

hadoop - Hadoop中的MapReduce作业的物理进程树(在群集节点上)