apache-spark - 如何将转换后的数据从分区发送到 S3?

标签 apache-spark

我有一个RDD,它太大了collect。我对 RDD 应用了一系列转换,并希望将其转换后的数据直接从我的从节点上的分区发送到 S3。我目前操作如下:

val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3

不幸的是,发送到 S3 的数据是未转换的。 RDD 看起来和 initializeRDD 阶段完全一样。

这里是 sendDataToS3 的主体:

implicit class WriteableRDD[T](rdd:RDD[T]){

def transform:RDD[String] = rdd map {_.toString}

....
def sendPartitionsToS3(prefix:String) = {
  rdd.foreachPartition { p =>
    val filename = prefix+new scala.util.Random().nextInt(1000000)
    val pw = new PrintWriter(new File(filename))
    p foreach pw.println
    pw.close
    s3.putObject(S3_BUCKET, filename, new File(filename))
  }
  this
}

}

这是用 rdd.transform.sendPartitionsToS3(prefix) 调用的。

如何确保在 sendDataToS3 中发送的数据是转换后的数据?

最佳答案

我猜你的代码中有一个问题没有包含在问题中。

无论如何我都会回答,只是为了确保您了解 RDD.saveAsTextFile。您可以为其提供 S3 上的路径 (s3n://bucket/directory),它会直接从执行程序将每个分区写入该路径。

我很难想象您什么时候需要实现自己的 sendPartitionsToS3 而不是使用 saveAsTextFile

关于apache-spark - 如何将转换后的数据从分区发送到 S3?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33704073/

相关文章:

apache-spark - 不支持的编码 : DELTA_BYTE_ARRAY when reading from Kusto using Kusto Spark connector or using Kusto export with Spark version < 3. 3.0

r - 在 Apache Spark 中使用 R

apache-spark - Spark-Streaming 最早在 kafka 开始偏移时挂起(Kafka 2,spark 2.4.3)

azure - 读取 CSV 时,是否可以选择从第 2 行或以下行开始?

scala - Spark SQL : How to append new row to dataframe table (from another table)

hadoop - 从 ResourceManager GUI 访问终止的 Spark 作业日志

apache-spark - Spark-Shell 的默认执行器和核心数

java - Spark 操作中用户 lib jar 优先于 oozie 共享 lib

scala - 如何在 Spark SQL 中合并两行?

scala - 如何在对RDD中找到最大值?