scala - 从Serializable Scala对象调用Hadoop FileSystem操作的最佳方法是什么

标签 scala apache-spark hadoop serialization

/我正在尝试/

我想对包含BZ2文件的多个HDFS存储桶执行一些Spark UDF转换。我定义了一个MyMain Scala对象extends Serializable,因为它涉及在每个HDFS存储桶上调用UDF转换。

但是,在执行UDF转换之前,我需要过滤实际上包含某些BZ2文件的HDFS存储桶。这需要我在MyMain.main方法中保留的Hadoop FileSystem操作,以限制驱动程序内存中的这些计算并且不分发给工作节点,因为据我所知FileSystem不可序列化。

但是,即使我制作了一个单独的可序列化的HadoopUtils类并制作了一个单例伴侣对象并在MyMain.main中调用了所有FileSystem操作,我仍然得到
“任务不可序列化”异常(如下)

/问题/

从可序列化的对象(例如MyMain)调用不可序列化的FileSystem操作的方式是什么?另外,class HadoopUtils extends Serializable似乎不是可序列化的,尽管定义是这样的?

/我的代码/

val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)

class HadoopUtils extends Serializable {

  def existsDir(fs: FileSystem, path: String) : Boolean = {
    val p = new Path(path)
    fs.exists(p) && fs.getFileStatus(p).isDirectory
  }
  def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
    val path = new Path(bucketBZDir)
    val fileStatus = fs.listStatus(path).filter(
      p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
    )
    !fileStatus.isEmpty
  }

  def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
    //Filter the list of buckets having at least one BZ2 file in it
    val range = (1 to 16).toList.map(x => x.toString)
    val buckets = prependtoList("Bucket",range)
    val allBuckets = prependtoList(lookupPath + "/", buckets)
    //From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
    val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
    val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
        path => { path + "/*.bz2" })
    BZ2BucketPaths
  }
}

object HadoopUtils {
  val getHadoopUtils = new HadoopUtils
}

object MyMain extends Serializable {
  val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
  val basePath = "/path/to/buckets"
  def main(args: Array[String]): Unit = {
    //NOTE: spark, hadoopfs defined in main so as to be processed in Driver
    val spark = SparkSession
      .builder()
      .appName("My_App")
      .enableHiveSupport()
      .getOrCreate()

    val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    val BZ2Buckets = 
      HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)

    BZ2Buckets.foreach(path => {
      //Doing Spark UDF transformations on each bucket, which needs to be serialized
    })


  }
}

/堆栈异常跟踪/
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:197)
  ... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
    - object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
    - field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
    - object (class $iw, $iw@3f4a0d43)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@74d06d1e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@f9764ea)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6821099e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4f509444)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11462802)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11d2d501)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@284fd700)
    - field (class: $line14.$read, name: $iw, type: class $iw)
    - object (class $line14.$read, $line14.$read@46b4206a)
    - field (class: $iw, name: $line14$read, type: class $line14.$read)
    - object (class $iw, $iw@33486894)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@25980fc9)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1fb0d28d)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42ea11d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42d28cc1)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@22131a73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@631878e1)
    - field (class: $line18.$read, name: $iw, type: class $iw)
    - object (class $line18.$read, $line18.$read@561c52c0)
    - field (class: $iw, name: $line18$read, type: class $line18.$read)
    - object (class $iw, $iw@1d5b8be2)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@4de4c672)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 85 more

最佳答案

看来Task not serializable问题与HadoopUtils类或对象都不相关。假设在驱动程序中,通过HadoopUtilssingleton HadoopUtils object访问HadoopUtils.getHadoopUtil类的实例,则HadoopUtils类需要与MyMain对象一起进行序列化。

这个问题的解决方案可以引用here

关于scala - 从Serializable Scala对象调用Hadoop FileSystem操作的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58586414/

相关文章:

hadoop - 提高 Hive 中 ORC 文件的写入速度

hadoop - Hive 中的时间戳分区

scala - Akka Http 客户端 + 负载均衡器

scala - 如何与 Seq[Double] 中的先前值进行比较

json - 使用 Jerkson 解析忽略字段,预计会出现有效值错误

scala - Spark : unpersist RDDs for which I have lost the reference

scala - 不支持的文字类型类 scala.runtime.BoxedUnit

scala - 我应该在两个不同的 RDD 上重用 HashPartitioner 吗?

scala - 函数到 Spark Dataframe 的每一行

hadoop - newAPIHadoopRDD 从 HBase 读取耗时过多(主要原因是 Dns.reverse Dns)