我正在尝试从 S3 读取大量大文件,如果作为 Dataframe 函数完成,这将花费相当多的时间。所以按照这个 post和相关的gist我正在尝试使用 RDD 并行读取 s3 对象,如下所示
def dfFromS3Objects(s3: AmazonS3, bucket: String, prefix: String, pageLength: Int = 1000) = {
import com.amazonaws.services.s3._
import model._
import spark.sqlContext.implicits._
import scala.collection.JavaConversions._
val request = new ListObjectsRequest()
request.setBucketName(bucket)
request.setPrefix(prefix)
request.setMaxKeys(pageLength)
val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
spark.sparkContext.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }.toDF()
}
测试结束时是
Caused by: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client
Serialization stack:
- object not serializable (class: com.amazonaws.services.s3.AmazonS3Client, value: com.amazonaws.services.s3.AmazonS3Client@35c8be21)
- field (class: de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, name: s3$1, type: interface com.amazonaws.services.s3.AmazonS3)
- object (class de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
我知道我提供的 AmazonS3
对象需要传送给执行者,因此需要是可序列化的,但这是来自一个示例片段,意味着有人让它工作,需要帮助来弄清楚是什么我在这里失踪了吗
最佳答案
在要点中,s3
被定义为将为每次调用创建一个新客户端的方法。不推荐这样做。解决该问题的一种方法是使用 mapPartitions
spark
.sparkContext
.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
.mapPartitions { it =>
val s3 = ... // init the client here
it.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
}
.toDF
这仍然会为每个 JVM 创建多个客户端,但可能比为每个对象创建一个客户端的版本少得多。如果你想在 JVM 内的线程之间重用客户端,你可以,例如将其包装在顶级对象中
object Foo {
val s3 = ...
}
并为客户端使用静态配置。
关于scala - Spark : java. io.NotSerializableException : com. amazonaws.services.s3.AmazonS3Client,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56147616/