scala - 为什么 spark-shell 在使用 3000 列的 DataFrame 后打印数千行代码?什么是 JaninoRuntimeException 和 64 KB?

标签 scala apache-spark

(用 spark-2.1.0-bin-hadoop2.7 本地机官网版本)

当我在 spark-shell 中执行一个简单的 spark 命令时,它会在抛出错误之前开始打印出成千上万行代码。这些“代码”是什么?

我在本地机器上运行 spark。我运行的命令很简单 df.count哪里df是一个数据帧。

请看下面的截图(代码飞得太快,我只能截图看看发生了什么)。更多细节在图片下方。 enter image description here

更多细节:

我创建了数据框 df经过

val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df. 
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count

“代码”后抛出的异常是:
...
/* 181897 */     apply_81(i);
/* 181898 */     result.setTotalSize(holder.totalSize());
/* 181899 */     return result;
/* 181900 */   }
/* 181901 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) 

编辑:正如@TzachZohar 指出的那样,这看起来像是已修复但未从 spark 项目中发布的已知错误之一 ( https://issues.apache.org/jira/browse/SPARK-16845 )。

我拉了 spark master,从源头构建它,然后重试我的例子。现在我在生成的代码之后遇到了一个新的异常:
/* 308608 */     apply_1560(i);
/* 308609 */     apply_1561(i);
/* 308610 */     result.setTotalSize(holder.totalSize());
/* 308611 */     return result;
/* 308612 */   }
/* 308613 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)

看起来拉取请求正在解决第二个问题:https://github.com/apache/spark/pull/16648

最佳答案

这是一个错误。它与在 JVM 上生成的运行时代码有关。所以Scala团队似乎很难解决。 (关于 JIRA 有很多讨论)。

我在进行行操作时发生了错误。即使 df.head() 在 700 行的数据帧上也会导致异常。

我的解决方法是将数据帧转换为稀疏数据 RDD(即 RDD[LabeledPoint])并在 RDD 上运行 rowwise 操作。它更快,内存效率更高。然而,它只适用于数字数据。分类变量(因子、目标等)需要转换为 Double。

也就是说,我自己是 Scala 的新手,所以我的代码可能有点业余。但它有效。

创建行

@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
  try
  {
    logger.info(s"fieldNameSeq $fieldNameSeq")
    val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)

    //println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
    print(".")

    val rowValuesItr: Iterable[Long] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0


    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv.toDouble;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{

  //println("castColumnTo")
  df.withColumn(cn, df(cn).cast(tpe)

  )
}

提供一个 Dataframe 并返回 RDD LabeledPOint
@throws(classOf[Exception])
 def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
  try
  {


    val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


    val indexer = new StringIndexer()
      .setInputCol(targetColumnName)
      .setOutputCol(targetColumnName+"_Indexed")
    val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
    val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)

    //mDFFinal.show()
    //only doubles accepted by sparse vector, so that's what we filter for


    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

    mDFFinal.collect().foreach
    {

      row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));

    }

    spark.sparkContext.parallelize(positionsArray.toSeq)

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

关于scala - 为什么 spark-shell 在使用 3000 列的 DataFrame 后打印数千行代码?什么是 JaninoRuntimeException 和 64 KB?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43481291/

相关文章:

azure - Spark作业最后无法删除其临时文件夹

从 Databricks 访问 Azure 数据存储

apache-spark - "No Filesystem for Scheme: gs"在本地运行 spark 作业时

scala - 在读取 CSV 时,最后一列在 Spark、Scala 中显示为 Null

scala,返回类型指南 - 当更喜欢 seq、可迭代、可遍历时

scala - 在哪里放置定期执行的更新参与者内部状态的计算?

unit-testing - 如何在 Play Framework v2 中指定用于测试目的的自定义数据库连接参数?

scala - 当对其中一项的检查返回 false 时结束 for-comprehension 循环

apache-spark - 有没有办法在Java Spark 2.1中进行广播联接

scala - 有没有一种好方法可以将 Spark 中的流与更改表连接起来?