我使用的是带有 Spark 2.2.1 和 Scala 2.11 的 DataBricks。我正在尝试运行如下所示的 SQL 查询。
select stddev(col1), stddev(col2), ..., stddev(col1300)
from mydb.mytable
然后我执行代码如下。
myRdd = sqlContext.sql(sql)
但是,我看到抛出以下异常。
Job aborted due to stage failure: Task 24 in stage 16.0 failed 4 times, most recent failure: Lost task 24.3 in stage 16.0 (TID 1946, 10.184.163.105, executor 3): org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection has grown past JVM limit of 0xFFFF /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private boolean evalExprIsNull; /* 010 */ private boolean evalExprValue; /* 011 */ private boolean evalExpr1IsNull; /* 012 */ private boolean evalExpr1Value; /* 013 */ private boolean evalExpr2IsNull; /* 014 */ private boolean evalExpr2Value; /* 015 */ private boolean evalExpr3IsNull; /* 016 */ private boolean evalExpr3Value; /* 017 */ private boolean evalExpr4IsNull; /* 018 */ private boolean evalExpr4Value; /* 019 */ private boolean evalExpr5IsNull; /* 020 */ private boolean evalExpr5Value; /* 021 */ private boolean evalExpr6IsNull;
The stacktrace just goes on and on, and even the Databricks notebook crashes because of the verbosity. Anyone ever seen this?
Also, I have the following 2 SQL statements to get the average and median that I execute without any problems.
select avg(col1), ..., avg(col1300) from mydb.mytable
select percentile_approx(col1, 0.5), ..., percentile_approx(col1300, 0.5) from mydb.mytable
问题似乎出在 stddev
上,但异常没有帮助。对发生的事情有什么想法吗?是否有另一种方法可以轻松计算标准差而不会导致此问题?
事实证明这是post描述了同样的问题,称由于 64KB 大小的类的限制,Spark 无法处理宽模式或大量列。但是,如果是这样,那么为什么 avg
和 percentile_approx
有效?
最佳答案
几个选项:
尝试禁用整个阶段代码生成:
spark.conf.set("spark.sql.codegen.wholeStage", false)
如果上述方法无法帮助切换到 RDD(由 this answer 从 zeo323 采用):
import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer val columns: Seq[String] = ??? df .select(columns map (col(_).cast("double")): _*) .rdd .map(row => Vectors.dense(columns.map(row.getAs[Double](_)).toArray)) .aggregate(new MultivariateOnlineSummarizer)( (agg, v) => agg.add(v), (agg1, agg2) => agg1.merge(agg2))
使用
VectorAssembler
和Aggregator
将列组装成单个向量,类似于here ,调整finish
方法(您可能需要一些额外的调整才能将ml.linalg.Vectors
转换为mllib.linalg.Vectors
)。
However, if that's the case, then why does avg and percentile_approx work?
Spark 实际上会为这些阶段生成 Java 代码。由于逻辑不一样,输出大小也会不同。
关于scala - 调用 stddev 超过 1,000 列时 SparkSQL 作业失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50425948/