我一直在使用 Spark Dataset API 对 JSON 执行操作以根据需要提取某些字段。但是,当我提供的让 spark 知道要提取哪个字段的规范出错时,spark 会吐出一个
org.apache.spark.sql.AnalysisException
如何在这样的分布式处理场景中处理未经检查的运行时异常?我知道抛出一个 try-catch 可以解决问题,但是处理这种情况的推荐方法是什么
dataset = dataset.withColumn(current, functions.explode(dataset.col(parent + Constants.PUNCTUATION_PERIOD + child.substring(0, child.length() - 2))));
最佳答案
在 Scala 中,您应该简单地将调用包装在 Try
中并管理失败。像这样的东西:
val result = Try(executeSparkCode()) match {
case s: Success(_) => s;
case Failure(error: AnalysisException) => Failure(new MyException(error));
}
注意 1:如果您的问题暗示如何在 Scala 中管理异常,那么有很多文档和帖子都是关于这个主题的(即不要抛出)。例如,您可以检查 that answer (of mine)
注意 2:我这里没有 scala 开发环境,所以我没有测试这段代码)
然而在 Java 中有一个棘手的情况:编译器不期望一个未检查的 AnalysisException,所以你不能专门捕获这个异常。可能是一些 scala/java 误解,因为 scala 不跟踪已检查的异常。我所做的是:
try{
return executeSparkCode();
} catch (Exception ex) {
if(ex instanceOf AnalysisException){
throw new MyException(ex);
} else {
throw ex; // unmanaged exceptions
}
}
注意:在我的例子中,我还针对我必须管理的特定异常(即“路径不存在”)测试了错误消息的内容,在这种情况下,我返回一个空数据集而不是抛出另一个异常。我正在寻找更好的解决方案,碰巧到了这里......
关于java - 如何正确处理 spark.sql.AnalysisException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51124743/