scala - AWS Glue RDD.saveAsTextFile() 引发类 org.apache.hadoop.mapred.DirectOutputCommitter 未找到

标签 scala apache-spark rdd aws-glue

我正在创建简单的 ETL,它读取十亿个文件并重新分区它们(换句话说,压缩到更小的数量以便进一步处理)。

简单的 AWS Glue 应用程序:

import org.apache.spark.SparkContext

object Hello {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val input_path =  "s3a://my-bucket-name/input/*"
    val output_path = "s3a://my-bucket-name/output/*"
    val num_partitions = 5
    val ingestRDD = spark.textFile(input_path)
    ingestRDD.repartition(num_partitions).saveAsTextFile(output_path)    
  }
}

引发以下追溯:

ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Exception in User Class: java.lang.RuntimeException : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.mapred.DirectOutputCommitter not found
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2401)
org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:725)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1048)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
Hello$.main(hello_world_parallel_rdd_scala:18)
Hello.main(hello_world_parallel_rdd_scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
com.amazonaws.services.glue.SparkProcessLauncherPlugin$class.invoke(ProcessLauncher.scala:38)
com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:67)
com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:108)
com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:21)
com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)

同时此代码代码在本地环境、集群和 EMR 集群中工作。

最佳答案

import org.apache.spark.SparkContext

object Hello {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    spark.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
    val input_path =  "s3a://my-bucket-name/input/*"
    val output_path = "s3a://my-bucket-name/output/*"
    val num_partitions = 5
    val ingestRDD = spark.textFile(input_path)
    ingestRDD.repartition(num_partitions).saveAsTextFile(output_path)    
  }
}

关于scala - AWS Glue RDD.saveAsTextFile() 引发类 org.apache.hadoop.mapred.DirectOutputCommitter 未找到,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65409534/

相关文章:

scala - 如何将 RDD 保存到 HDFS 中并稍后将其读回?

python - 如何从 PySpark 中的 map 方法返回空(null?)项?

Java将原始字节编码为图像简单图像格式/文件

java - Proguard 警告 : can't find referenced class scala. *

java - 在java中将json字符串解析为scala案例类

apache-spark - 从嵌套数组和结构 Spark 中提取值

caching - 在 Apache Spark 中缓存 RDD 的目的是什么?

scala - Scala 中 println 的替代方案

hadoop - Spark 是否支持静态加密?

spring - Spark/Spring验证API依赖项冲突