我有一个RDD
,它太大了collect
。我对 RDD
应用了一系列转换,并希望将其转换后的数据直接从我的从节点上的分区发送到 S3。我目前操作如下:
val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3
不幸的是,发送到 S3 的数据是未转换的。 RDD
看起来和 initializeRDD
阶段完全一样。
这里是 sendDataToS3 的主体:
implicit class WriteableRDD[T](rdd:RDD[T]){
def transform:RDD[String] = rdd map {_.toString}
....
def sendPartitionsToS3(prefix:String) = {
rdd.foreachPartition { p =>
val filename = prefix+new scala.util.Random().nextInt(1000000)
val pw = new PrintWriter(new File(filename))
p foreach pw.println
pw.close
s3.putObject(S3_BUCKET, filename, new File(filename))
}
this
}
}
这是用 rdd.transform.sendPartitionsToS3(prefix)
调用的。
如何确保在 sendDataToS3
中发送的数据是转换后的数据?
最佳答案
我猜你的代码中有一个问题没有包含在问题中。
无论如何我都会回答,只是为了确保您了解 RDD.saveAsTextFile
。您可以为其提供 S3 上的路径 (s3n://bucket/directory
),它会直接从执行程序将每个分区写入该路径。
我很难想象您什么时候需要实现自己的 sendPartitionsToS3
而不是使用 saveAsTextFile
。
关于apache-spark - 如何将转换后的数据从分区发送到 S3?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33704073/