scala - 在 Spark 中操作大量列时出现 StackOverflowError

标签 scala apache-spark mapreduce spark-dataframe stack-overflow

我有一个宽数据框(130000 行 x 8700 列),当我尝试对所有列求和时,我收到以下错误:

Exception in thread "main" java.lang.StackOverflowError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...



这是我的 Scala 代码:
  val df = spark.read
    .option("header", "false")
    .option("delimiter", "\t")
    .option("inferSchema", "true")
    .csv("D:\\Documents\\Trabajo\\Fábregas\\matrizLuna\\matrizRelativa")


  val arrayList = df.drop("cups").columns
  var colsList = List[Column]()
  arrayList.foreach { c => colsList :+= col(c) }

  val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))

如果我对几列执行相同的操作,它可以正常工作,但是当我尝试对大量列进行归约操作时,我总是会遇到相同的错误。

谁能建议我该怎么做?列数有限制吗?

谢谢!

最佳答案

您可以使用不同的缩减方法来生成深度平衡的二叉树 O(log(n))而不是退化的线性 BinaryExpression深度链O(n) :

def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
  case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
  case List(x) => x
  case xs => {
    val n = xs.size
    val (as, bs) = list.splitAt(n / 2)
    op(balancedReduce(as)(op), balancedReduce(bs)(op))
  }
}

现在在您的代码中,您可以替换
colsList.reduce(_ + _)

经过
balancedReduce(colsList)(_ + _)

一个小例子来进一步说明 BinaryExpression 会发生什么。 s,可以编译,没有任何依赖:
sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
  override def toString: String = {
    val lStr = left.toString.split("\n").map("  " + _).mkString("\n")
    val rStr = right.toString.split("\n").map("  " + _).mkString("\n")
    return s"BinOp(\n${lStr}\n${rStr}\n)"
  }
}
case object Leaf extends FormalExpr

val leafs = List.fill[FormalExpr](16){Leaf}

println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))

这就是普通的reduce确实(这就是您的代码中本质上发生的事情):
BinOp(
  BinOp(
    BinOp(
      BinOp(
        BinOp(
          BinOp(
            BinOp(
              BinOp(
                BinOp(
                  BinOp(
                    BinOp(
                      BinOp(
                        BinOp(
                          BinOp(
                            BinOp(
                              Leaf
                              Leaf
                            )
                            Leaf
                          )
                          Leaf
                        )
                        Leaf
                      )
                      Leaf
                    )
                    Leaf
                  )
                  Leaf
                )
                Leaf
              )
              Leaf
            )
            Leaf
          )
          Leaf
        )
        Leaf
      )
      Leaf
    )
    Leaf
  )
  Leaf
)

这就是 balancedReduce产生:
BinOp(
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
)

线性化链的长度为 O(n) ,当 Catalyst 试图评估它时,它会破坏堆栈。深度扁平树不应该发生这种情况 O(log(n)) .

当我们谈论渐近运行时:为什么要附加到可变 colsList ?这需要O(n^2)时间。为什么不直接调用toList .columns 的输出?

关于scala - 在 Spark 中操作大量列时出现 StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49691021/

相关文章:

scala.io.Source.fromInputStream 不会进一步抛出 MalformedInputException

java - Scala 中 '+' 、 '*' 、 '/' (算术运算符)等基本方法的实现在哪里?

apache-spark - 从Spark Dataframe获取表名称

hadoop - 如何将 AvroKeyValueOutputFormat 文件导入配置单元?

python - 声明 mrjob 映射器而不忽略 key

hadoop - 启动 MapReduce 作业的不同方式

macos - 无法在 Mac 上安装 sbt

scala - 什么取代了 scala 中的类变量?

python - 如何使用 approx_count_distinct 计算 Spark DataFrame 中两列的不同组合?

hadoop - 为什么创建了很多 spark-warehouse 文件夹?