scala - 在出现一定数量的错误后停止在 Apache Spark 中处理大型文本文件

标签 scala validation apache-spark text-files

我对 Spark 很陌生。我正在 1.6.1 中工作。 假设我有一个大文件,我正在通过 textFile 将其读入 RDD[String] 中。 然后我想验证某个函数中的每一行。 因为文件很大,所以我想在达到一定数量的错误时停止处理,比如说 1000 行。 类似的东西

val rdd = SparkContext.textFile(文件名) rdd.map(line => myValidator.validate(line))

这是验证函数:

def validate(line:String) : (String, String) = { //元组中的第一个用于结果行,第二个用于验证错误。 }

如何计算“validate”内部的错误?它实际上是在多个节点上并行执行的?广播?累加器?

最佳答案

您可以利用 Spark 的惰性来实现此行为,方法是将解析结果“拆分”为成功和失败,对失败调用 take(n),并且仅在存在时才使用成功数据少于 n 次失败。

为了更方便地实现这一点,我建议更改 validate 的签名以返回某种可以轻松区分成功与失败的类型,例如scala.util.Try:

def validate(line:String) : Try[String] = {
    // returns Success[String] on success, 
    // Failure (with details in the exception object) otherwise 
}

然后,类似:

val maxFailures = 1000
val rdd = sparkContext.textFile(fileName)
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache()

val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures)

if (failures.size == maxFailures) { 
  // report failures... 
} else {
  val success: RDD[String] = parsed.collect { case Success(s) => s }
  // continue here...
}

为什么这会起作用?

  • 如果失败次数少于 1000 次,则调用 take(maxFailures) 时将解析整个数据集,成功的数据将被缓存并可供使用
  • 如果失败次数为 1000 次或更多,解析将在此停止,因为 take 操作不再需要读取

关于scala - 在出现一定数量的错误后停止在 Apache Spark 中处理大型文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37581652/

相关文章:

ScalaJS 简单地将复杂输出重定向到指定目录

scala - IntelliJ 似乎没有选择某些 sbt 库,没有代码完成

scala - Spark DataFrame 中的条件连接

azure - 数据 block : difference between mount and direct access of Data Lake Storage Gen 2

scala - 如何使涉及 future 尾递归的函数?

syntax - 奥德斯基对 "bills !*&^%~ code!"是认真的吗?

使用 IDataErrorInfo 提交 WPF 验证

asp.net-mvc - 不要验证 MVC 上的日期字段

javascript - 使用 jquery 显示输入文件上传大小限制的客户端验证消息?

apache-spark - 如何覆盖使用 SparkContext.addFile 添加的文件?