scala - 如何处理 Spark 和 Scala 中的异常

标签 scala apache-spark exception-handling

我正在尝试处理 Spark 中的常见异常,例如 .map 操作无法对数据的所有元素正常工作或 FileNotFound 异常。我已阅读所有现有问题和以下两篇文章:

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

我在 attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble 行中尝试了 Try 语句
所以它是 attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
但这不会编译;编译器将无法识别 .toDF()稍后声明。我也尝试过类似 Java 的 Try { Catch {}} 块,但无法获得正确的范围; df然后不返回。有谁知道如何正确地做到这一点?我什至需要处理这些异常,因为 Spark 框架似乎已经在处理 FileNotFound 异常,而我没有添加异常。但是,例如,如果输入文件的列数错误,我想生成一个错误,包括模式中的字段数。

这是代码:

object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in        subsequent tests */
def createDataFrame(fileName: String): DataFrame = {

import spark.implicits._

//try {
val df = spark.sparkContext
  .textFile("/path/to/file" + fileName)
  .map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
  .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
        attributes(3).toDouble, attributes(4).toDouble,
        attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
        attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
        attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
        attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
        attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
        attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
        attributes(23).toInt))
  .toDF()
  .cache()
df
} catch {
    case ex: FileNotFoundException => println(s"File $fileName not found")
    case unknown: Exception => println(s"Unknown exception: $unknown")

}
}

所有建议表示赞赏。谢谢!

最佳答案

另一种选择是使用 Try输入scala。

例如:

def createDataFrame(fileName: String): Try[DataFrame] = {

try {
      //create dataframe df
      Success(df)
    } catch {
      case ex: FileNotFoundException => {
        println(s"File $fileName not found")
        Failure(ex)
      }
      case unknown: Exception => {
        println(s"Unknown exception: $unknown")
        Failure(unknown)
      }
    }
  }

现在,在调用方,处理它:
createDataFrame("file1.csv") match {
  case Success(df) => {
    // proceed with your pipeline
  }
  case Failure(ex) => //handle exception
}

这比使用 Option 稍微好一点,因为调用者会知道失败的原因并且可以更好地处理。

关于scala - 如何处理 Spark 和 Scala 中的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45894688/

相关文章:

string - 使用 zip 查找 Scala 中两个字符串之间的差异

python - 从连接的 RDD 中提取值

algorithm - 从另一个点有效地找到最近的点

Android:我应该如何处理数据库访问异常?

scala - 如何根据某些条件从 Spark 数据框中获取几行

基本类型的 Scala 上限类型

apache-spark - UserWarning : pyarrow. open_stream 已弃用,请使用 pyarrow.ipc.open_stream 警告

delphi - 我什么时候应该使用 "try" block ,我应该使用哪种?

python - 当前异常上下文掩盖了先前的错误

scala - 如何访问--files指定的文件?