scala - 如何在 Spark Streaming Scala 中对 HBase 进行单元测试

标签 scala unit-testing apache-spark hbase spark-streaming

我正在尝试单元测试 doSomethingRdd这需要在 rdd 转换中从 HBase 读取一些引用数据。

def doSomethingRdd(in: DStream[String]): DStream[String] = {
    in.map(i => {
        val cell = HbaseUtil.getCell("myTable", "myRowKey", "myFamily", "myColumn") 
        i + cell.getOrElse("")
    })
}

Object HBaseUtil {
    def getCell(tableName: String, rowKey: String, columnFamily: String, column: String): Option[String] = {
    val HBaseConn = ConnectionPool.getConnection()
    //the rest of the code will use HBaseConn 
    //to get a HBase cell and convert to a string
    }
}

我读了这个 Cloudera article但我对他们推荐的方法有一些问题。

我尝试的第一件事是使用 ScalaMock 来模拟 HBaseUtil.getUtil方法,以便我可以绕过 HBase 连接。我还做了一些解决方法来模拟这个 article 建议的对象单例.我更新了我的代码,如下所示。然而,doSomethingRdd失败,因为模拟的 hbaseUtil 不是序列化,Paul Butcher 在他的 reply 中也解释了这一点。
def doSomethingRdd(in: DStream[String], hbaseUtil: HBaseUtilBody:HBaseUtil): DStream[String] = {
    in.map(i => {
        val cell = HbaseUtil.getCell("myTable", "myRowKey", "myFamily", "myColumn") 
        i + cell.getOrElse("")
    })
}

trait HBaseUtilBody {
    def getCell(tableName: String, rowKey: String, columnFamily: String, column: String): Option[String] = {
    val HBaseConn = ConnectionPool.getConnection()
    //the rest of the code will use HBaseConn 
    //to get a HBase cell and convert to a string
    }
}

object HBaseUtil extends HBaseUtilBody

我认为在 RDD 转换中从 HBase 获取数据将是一种非常常见的模式。但我不确定如何在不连接到真实 HBase 实例的情况下对其进行单元测试。

最佳答案

在 2020 年的 HBase 2.x 中,我们使用 hbase-testing-util .只需将其添加到您的 SBT 构建文件中

// https://mvnrepository.com/artifact/org.apache.hbase/hbase-testing-util

libraryDependencies += "org.apache.hbase" % "hbase-testing-util" % "2.2.3" % Test


然后像这样建立连接
import org.apache.hadoop.hbase.HBaseTestingUtility

val utility = new HBaseTestingUtiliy
utility.startMiniCluster() // defaults to 1 master, 1 region server and 1 data node
val connection = utility.getConnection()
启动 MiniCluster 实际启动
  • MiniDFSCluster
  • MiniZKCluster 和
  • MiniHBaseCluster

  • 如果您需要添加一些特定的配置(例如安全设置),您可以将 hbase-site.xml 添加到您的资源中。
    有关更多信息,请参阅 HBase Reference Guide 中使用 HBase 迷你集群的集成测试部分。 .

    关于scala - 如何在 Spark Streaming Scala 中对 HBase 进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39516015/

    相关文章:

    java - Apache Spark Maven POM 错误

    c# - 为什么或如何将 NUnit 方法与 ICollection<T> 一起使用

    unit-testing - Wicket/MVC 架构/测试问题

    apache-spark - 如何让 Spark 快速清晰地失败

    scala - 控制数据流图或中间表示

    scala - 如何将集合作为新列附加到具有许多列的 DataFrame?

    java - 这真的是从 Java 将 void 函数传递给 Scala 方法的方法吗?

    java - Java 中的单元测试 - 它是什么?

    python - Spark 程序在独立集群上运行时给出奇怪的结果

    python - Spark DataFrame 运算符(唯一、乘法)