scala - Spark : java. io.NotSerializableException : com. amazonaws.services.s3.AmazonS3Client

标签 scala amazon-web-services apache-spark amazon-s3

我正在尝试从 S3 读取大量大文件,如果作为 Dataframe 函数完成,这将花费相当多的时间。所以按照这个 post和相关的gist我正在尝试使用 RDD 并行读取 s3 对象,如下所示

def dfFromS3Objects(s3: AmazonS3, bucket: String, prefix: String, pageLength: Int = 1000) = {
    import com.amazonaws.services.s3._
    import model._
    import spark.sqlContext.implicits._

    import scala.collection.JavaConversions._

    val request = new ListObjectsRequest()
    request.setBucketName(bucket)
    request.setPrefix(prefix)
    request.setMaxKeys(pageLength)

    val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.

    spark.sparkContext.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
      .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }.toDF()
  }

测试结束时是

Caused by: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client
Serialization stack:
    - object not serializable (class: com.amazonaws.services.s3.AmazonS3Client, value: com.amazonaws.services.s3.AmazonS3Client@35c8be21)
    - field (class: de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, name: s3$1, type: interface com.amazonaws.services.s3.AmazonS3)
    - object (class de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
    ... 63 more

我知道我提供的 AmazonS3 对象需要传送给执行者,因此需要是可序列化的,但这是来自一个示例片段,意味着有人让它工作,需要帮​​助来弄清楚是什么我在这里失踪了吗

最佳答案

在要点中,s3 被定义为将为每次调用创建一个新客户端的方法。不推荐这样做。解决该问题的一种方法是使用 mapPartitions

spark
  .sparkContext
  .parallelize(objs.getObjectSummaries.map(_.getKey).toList)
  .mapPartitions { it =>
    val s3 = ... // init the client here
    it.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
  }
  .toDF

这仍然会为每个 JVM 创建多个客户端,但可能比为每个对象创建一个客户端的版本少得多。如果你想在 JVM 内的线程之间重用客户端,你可以,例如将其包装在顶级对象中

object Foo {
  val s3 = ...
}

并为客户端使用静态配置。

关于scala - Spark : java. io.NotSerializableException : com. amazonaws.services.s3.AmazonS3Client,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56147616/

相关文章:

Scala:如何对泛型类的类型参数提出要求?

android - 如何显示来自AWS S3的图像而不下载到本地存储

mysql - RDS 数据库存储空间不足

scala - 在文本文件中写入/存储数据帧

scala - Spark DataFrame - 从列中删除空值

scala - 为什么 scalac 在重载 'andThen' 时会对未实现 Function 的 SAM 类型感到困惑?

scala - 我什么时候会在 IntelliJ IDEA 中使用 "Mark directory as ..."选项?

scala - 类型类解析中的隐式类别

amazon-web-services - 使用 Ref 作为 Fn::Sub 内部函数中的第一个参数

scala - 设置 Cassandra 表扫描上的 Spark 任务数