我在 for 循环中生成小数据帧。在每一轮 for 循环中,我将生成的数据帧传递给返回 double 的函数。这个简单的过程(我认为垃圾收集器可以轻松处理)让我内存犹新。当我在每一轮 for 循环中查看 Spark UI 时,它会添加一个新的“SQL{1-500}”(我的循环运行 500 次)。我的问题是如何在生成新对象之前删除此 sql 对象?
我的代码是这样的:
Seq.fill(500){
val data = (1 to 1000).map(_=>Random.nextInt(1000))
val dataframe = createDataFrame(data)
myFunction(dataframe)
dataframe.unpersist()
}
def myFunction(df: DataFrame)={
df.count()
}
我试图通过 dataframe.unpersist() 和 sqlContext.clearCache() 解决这个问题,但它们都不起作用。
最佳答案
你有两个地方我怀疑发生了可疑的事情:
- 在
myFunction
的定义中:您确实需要将=
放在定义主体之前。我在编译时遇到了这样的拼写错误,但产生了非常奇怪的错误(请注意,我出于调试目的更改了您的 myFunction) - 最好用你知道的东西填充你的 Seq,然后申请 foreach 或类似的东西
(您还需要将 random.nexInt
替换为 Random.nextInt
,而且,您只能从子类型类型的 Seq 创建 DataFrame Product的,比如元组,需要用到sqlContext才能使用createDataFrame)
这段代码没有内存问题:
Seq.fill(500)(0).foreach{ i =>
val data = {1 to 1000}.map(_.toDouble).toList.zipWithIndex
val dataframe = sqlContext.createDataFrame(data)
myFunction(dataframe)
}
def myFunction(df: DataFrame) = {
println(df.count())
}
编辑:并行计算(跨越 10 个核心)并返回计数的 RDD:
sc.parallelize(Seq.fill(500)(0), 10).map{ i =>
val data = {1 to 1000}.map(_.toDouble).toList.zipWithIndex
val dataframe = sqlContext.createDataFrame(data)
myFunction(dataframe)
}
def myFunction(df: DataFrame) = {
df.count()
}
编辑 2:使用 =
和不使用 =
声明函数 myFunction
的区别在于第一个是(通常)函数定义,而另一个是过程定义,仅用于返回 Unit 的方法。参见 explanation .这是 Spark-shell 中说明的这一点:
scala> def myf(df:DataFrame) = df.count()
myf: (df: org.apache.spark.sql.DataFrame)Long
scala> def myf2(df:DataFrame) { df.count() }
myf2: (df: org.apache.spark.sql.DataFrame)Unit
关于sql - 在 Scala Spark 的 for 循环中生成数据帧导致内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34114131/