这是我的一个问题already asked在 spark 用户邮件列表上,我希望在这里取得更大的成功。
我不确定它是否与 spark 直接相关,尽管 spark 与我无法轻易解决该问题的事实有关。
我正在尝试使用各种模式从 S3 获取一些文件。我的问题是其中一些模式可能什么都不返回,当它们返回时,我得到以下异常:
org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
... 2 more
我想要一种方法来忽略丢失的文件并且在这种情况下什么也不做。 IMO 的问题是,我不知道模式是否会在实际执行之前返回某些内容,并且 spark 仅在发生操作时才开始处理数据(此处为 reduceByKey
部分)。所以我不能只是在某处捕获错误并让事情继续进行。
一个解决方案是强制 spark 单独处理每条路径,但这可能会在速度和/或内存方面分配成本,因此我正在寻找其他有效的选项。
我正在使用 spark 0.9.1。 谢谢
最佳答案
好的,深入了解 Spark,感谢有人在 spark 用户列表上指导我,我想我明白了:
sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
return arg0._2.toString();
}
})
.count();
还有神奇的 EmptiableTextInputFormat
:
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class EmptiableTextInputFormat extends TextInputFormat {
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException {
try {
return super.getSplits(arg0);
} catch (InvalidInputException e) {
return Collections.<InputSplit> emptyList();
}
}
}
最终可以检查 InvalidInputException
的消息以获得更精确的信息。
关于hadoop - 绕过 org.apache.hadoop.mapred.InvalidInputException : Input Pattern s3n://[. ..] 匹配 0 个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23783926/