scala - 通过Spark写入HBase : Task not serializable

标签 scala apache-spark hbase

我正在尝试使用 Spark 1.0 在 HBase (0.96.0-hadoop2) 中写入一些简单的数据,但我不断遇到序列化问题。相关代码如下:

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put

object PutRawDataIntoHbase{
  def main(args: Array[String]): Unit = {
    var propFileName = "hbaseConfig.properties"
    if(args.size > 0){
      propFileName = args(0)
    }

    /** Load properties here **/
   val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
     .map(l => l.split("\t"))
     .map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))

   val tableName = prop.getProperty("hbase.table.name")
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
   hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
   val myTable = new HTable(hbaseConf, tableName)
   theData.foreach(a=>{
     var p = new Put(Bytes.toBytes(a(0)))
     p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
      myTable.put(p)
    })
  }
}

运行代码结果:

Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable

用map替换foreach不会崩溃,但我也不写。 任何帮助将不胜感激。

最佳答案

HBaseConfiguration 类表示与 HBase 服务器的连接池。显然,它无法被序列化并发送到工作节点。由于 HTable 使用此池与 HBase 服务器通信,因此它也无法序列化。

基本上,可以通过三种方法来处理这个问题:

在每个工作节点上打开连接。

注意foreachPartition方法的使用:

val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
  val hbaseConf = HBaseConfiguration.create()
  <... configure HBase ...>
  val myTable = new HTable(hbaseConf, tableName)
  iter.foreach { a =>
   var p = new Put(Bytes.toBytes(a(0)))
   p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
    myTable.put(p)
  }
}

请注意,每个工作节点都必须有权访问 HBase 服务器,并且必须预先安装或通过 ADD_JARS 提供所需的 jar。

另请注意,由于如果为每个分区打开连接池,因此最好将分区数量大致减少到工作节点的数量(使用合并功能)。也可以在每个工作节点上共享一个 HTable 实例,但这并不是那么简单。

将所有数据序列化到单个盒子并写入HBase

可以用一台计算机写入 RDD 中的所有数据,即使数据不适合内存。详细信息在此答案中进行了解释:Spark: Best practice for retrieving big data from RDD to local machine

当然,它会比分布式写入慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,可能是最好的方法。

使用 HadoopOutputFormat

可以为 HBase 创建自定义 HadoopOutputFormat 或使用现有格式。我不确定是否有适合您需求的东西,但 Google 应该可以提供帮助。

P.S. 顺便说一下,map 调用不会崩溃,因为它不会被求值:RDD 不会被求值,直到你用 side- 调用一个函数。影响。例如,如果您调用 theData.map(....).persist,它也会崩溃。

关于scala - 通过Spark写入HBase : Task not serializable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25250774/

相关文章:

scala - 如何在可变的LinkedList的特定位置插入内容?

Scala 和前向引用

scala - Scala 中的函数参数,编写折叠

scala - Spark scala Dataframe isin

hadoop - 使用 Hadoop 和 HBase 的增量 MapReduce

scala - 如何深度复制混合了特征的类

apache-spark - 即使在分区数据中,Spark 也会列出所有叶节点

sql - PySpark/Spark 窗口函数第一期/最后一期

hadoop - HBase中的WAL文件

hadoop - 如何将一些数据发送到 Mapper 类(在 HBase 数据库中的数据上运行)