我对 Spark 很陌生。我正在 1.6.1 中工作。 假设我有一个大文件,我正在通过 textFile 将其读入 RDD[String] 中。 然后我想验证某个函数中的每一行。 因为文件很大,所以我想在达到一定数量的错误时停止处理,比如说 1000 行。 类似的东西
val rdd = SparkContext.textFile(文件名)
rdd.map(line => myValidator.validate(line))
这是验证函数:
def validate(line:String) : (String, String) = {
//元组中的第一个用于结果行,第二个用于验证错误。
}
如何计算“validate”内部的错误?它实际上是在多个节点上并行执行的?广播?累加器?
最佳答案
您可以利用 Spark 的惰性来实现此行为,方法是将解析结果“拆分”为成功和失败,对失败调用 take(n)
,并且仅在存在时才使用成功数据少于 n
次失败。
为了更方便地实现这一点,我建议更改 validate
的签名以返回某种可以轻松区分成功与失败的类型,例如scala.util.Try
:
def validate(line:String) : Try[String] = {
// returns Success[String] on success,
// Failure (with details in the exception object) otherwise
}
然后,类似:
val maxFailures = 1000
val rdd = sparkContext.textFile(fileName)
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache()
val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures)
if (failures.size == maxFailures) {
// report failures...
} else {
val success: RDD[String] = parsed.collect { case Success(s) => s }
// continue here...
}
为什么这会起作用?
- 如果失败次数少于 1000 次,则调用
take(maxFailures)
时将解析整个数据集,成功的数据将被缓存并可供使用 - 如果失败次数为 1000 次或更多,解析将在此停止,因为
take
操作不再需要读取
关于scala - 在出现一定数量的错误后停止在 Apache Spark 中处理大型文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37581652/