sql - 在 Scala Spark 的 for 循环中生成数据帧导致内存不足

标签 sql scala apache-spark

我在 for 循环中生成小数据帧。在每一轮 for 循环中,我将生成的数据帧传递给返回 double 的函数。这个简单的过程(我认为垃圾收集器可以轻松处理)让我内存犹新。当我在每一轮 for 循环中查看 Spark UI 时,它会添加一个新的“SQL{1-500}”(我的循环运行 500 次)。我的问题是如何在生成新对象之前删除此 sql 对象?

我的代码是这样的:

Seq.fill(500){
  val data = (1 to 1000).map(_=>Random.nextInt(1000))
  val dataframe = createDataFrame(data)
  myFunction(dataframe)
  dataframe.unpersist()
}

def myFunction(df: DataFrame)={
  df.count()
}

我试图通过 dataframe.unpersist() 和 sqlContext.clearCache() 解决这个问题,但它们都不起作用。

最佳答案

你有两个地方我怀疑发生了可疑的事情:

  • myFunction 的定义中:您确实需要将 = 放在定义主体之前。我在编译时遇到了这样的拼写错误,但产生了非常奇怪的错误(请注意,我出于调试目的更改了您的 myFunction)
  • 最好用你知道的东西填充你的 Seq,然后申请 foreach 或类似的东西

(您还需要将 random.nexInt 替换为 Random.nextInt,而且,您只能从子类型类型的 Seq 创建 DataFrame Product的,比如元组,需要用到sqlContext才能使用createDataFrame)

这段代码没有内存问题:

Seq.fill(500)(0).foreach{ i => 
  val data = {1 to 1000}.map(_.toDouble).toList.zipWithIndex
  val dataframe = sqlContext.createDataFrame(data)
  myFunction(dataframe)
}

def myFunction(df: DataFrame) = {
  println(df.count())
}

编辑:并行计算(跨越 10 个核心)并返回计数的 RDD:

sc.parallelize(Seq.fill(500)(0), 10).map{ i => 
  val data = {1 to 1000}.map(_.toDouble).toList.zipWithIndex
  val dataframe = sqlContext.createDataFrame(data)
  myFunction(dataframe)
}

def myFunction(df: DataFrame) = {
  df.count()
}

编辑 2:使用 = 和不使用 = 声明函数 myFunction 的区别在于第一个是(通常)函数定义,而另一个是过程定义,仅用于返回 Unit 的方法。参见 explanation .这是 Spark-shell 中说明的这一点:

scala> def myf(df:DataFrame) = df.count()
myf: (df: org.apache.spark.sql.DataFrame)Long

scala> def myf2(df:DataFrame) { df.count() }
myf2: (df: org.apache.spark.sql.DataFrame)Unit

关于sql - 在 Scala Spark 的 for 循环中生成数据帧导致内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34114131/

相关文章:

mysql - SQL 查找每个地区每个学生交付的平均公斤数

mysql - mysql中如何查找给定值出现在哪些表中

java - 使用 Apache Spark 重新分区

scala - 如何区分组合和自类型用例

同一文件中的Scala特征和类

python - 如何使用 .whl 文件调用 pyspark 代码?

scala - org.apache.spark.SparkException : Failed to execute user defined function

sql - 使用表达式选择多个类似的命名字段

sql - Postgres : If we select a computed column multiple times, Postgres会一次又一次地计算它吗?

eclipse - 在heroku上部署scalatra应用程序错误找不到插件