scala - 调用 stddev 超过 1,000 列时 SparkSQL 作业失败

标签 scala apache-spark databricks

我使用的是带有 Spark 2.2.1 和 Scala 2.11 的 DataBricks。我正在尝试运行如下所示的 SQL 查询。

select stddev(col1), stddev(col2), ..., stddev(col1300)
from mydb.mytable

然后我执行代码如下。

myRdd = sqlContext.sql(sql)

但是,我看到抛出以下异常。

Job aborted due to stage failure: Task 24 in stage 16.0 failed 4 times, most recent failure: Lost task 24.3 in stage 16.0 (TID 1946, 10.184.163.105, executor 3): org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection has grown past JVM limit of 0xFFFF
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private boolean evalExprIsNull;
/* 010 */   private boolean evalExprValue;
/* 011 */   private boolean evalExpr1IsNull;
/* 012 */   private boolean evalExpr1Value;
/* 013 */   private boolean evalExpr2IsNull;
/* 014 */   private boolean evalExpr2Value;
/* 015 */   private boolean evalExpr3IsNull;
/* 016 */   private boolean evalExpr3Value;
/* 017 */   private boolean evalExpr4IsNull;
/* 018 */   private boolean evalExpr4Value;
/* 019 */   private boolean evalExpr5IsNull;
/* 020 */   private boolean evalExpr5Value;
/* 021 */   private boolean evalExpr6IsNull;

The stacktrace just goes on and on, and even the Databricks notebook crashes because of the verbosity. Anyone ever seen this?

Also, I have the following 2 SQL statements to get the average and median that I execute without any problems.

select avg(col1), ..., avg(col1300) from mydb.mytable
select percentile_approx(col1, 0.5), ..., percentile_approx(col1300, 0.5) from mydb.mytable

问题似乎出在 stddev 上,但异常没有帮助。对发生的事情有什么想法吗?是否有另一种方法可以轻松计算标准差而不会导致此问题?

事实证明这是post描述了同样的问题,称由于 64KB 大小的类的限制,Spark 无法处理宽模式或大量列。但是,如果是这样,那么为什么 avgpercentile_approx 有效?

最佳答案

几个选项:

  • 尝试禁用整个阶段代码生成:

    spark.conf.set("spark.sql.codegen.wholeStage", false)
    
  • 如果上述方法无法帮助切换到 RDD(由 this answerzeo323 采用):

    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
    
    val columns: Seq[String] = ???
    
    df
      .select(columns map (col(_).cast("double")): _*)
      .rdd
      .map(row => Vectors.dense(columns.map(row.getAs[Double](_)).toArray))
      .aggregate(new MultivariateOnlineSummarizer)(
         (agg, v) => agg.add(v), 
         (agg1, agg2) => agg1.merge(agg2))
    
  • 使用 VectorAssemblerAggregator 将列组装成单个向量,类似于here ,调整 finish 方法(您可能需要一些额外的调整才能将 ml.linalg.Vectors 转换为 mllib.linalg.Vectors)。

However, if that's the case, then why does avg and percentile_approx work?

Spark 实际上会为这些阶段生成 Java 代码。由于逻辑不一样,输出大小也会不同。

关于scala - 调用 stddev 超过 1,000 列时 SparkSQL 作业失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50425948/

相关文章:

java - Univocity 解析器 - 迭代器方式生成 scala 案例类

databricks - 通过 dbutils 移动文件时服务器端复制失败

apache-spark - 从 Databricks Notebook 发送带有附件的电子邮件

python - databricks dbfs 是否支持文件元数据,例如文件/文件夹创建日期或修改日期

list - Scala 中列表串联(:::)的复杂性?

scala - 在spark Scala的新行中添加两个日期之间的所有日期(周)

apache-spark - Spark : How to group by distinct values in DataFrame

scala - MonotonicallyIncreasingId 输出变化范围广泛

apache-spark - dynamic.partition=True 和dynamic.partition.mode = nonstrict 有什么区别?

scala - 如何向上转换 monad 变压器类型?