在我的 spark 应用程序中,我想在循环中对数据帧执行操作并将结果写入 hdfs。
伪代码:
var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs
在上面的示例中,当“mergeWith”执行 unionAll 时,我得到了很好的结果。
但是,当我在“mergeWith”中进行(简单的)连接时,工作变得非常慢(> 1h,有 2 个执行器,每个执行器有 4 个内核)并且永远不会完成(工作自行中止)。
在我的场景中,我对仅包含 ~1mb 文本数据的文件进行了约 50 次迭代。
因为合并顺序对我来说很重要,我怀疑这是由于 DAG 生成导致的,导致整个事情在我存储数据的那一刻运行。
现在我正在尝试在合并的数据框架上使用 .persist,但这似乎也很慢。
编辑:
在作业运行时,我注意到(即使我进行了计数和 .persist)内存中的数据帧看起来不像静态数据帧。 它看起来像是一条通往它一直在进行的所有合并的串联路径,有效地线性减慢了工作速度。
我是否可以假设 var df
是造成这种情况的罪魁祸首?
我对问题的分割:
dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....
当我期望 DF' A C 和 D 是对象时,以不同的方式激发事物并且不关心我是否坚持或重新分区。 Spark 看起来像这样:
dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....
更新2
为了摆脱持续不在 DF 上工作的问题,我可以在将 DF 转换为 RDD 并再次转换回来时“打破”沿袭。 这有一点开销,但可以接受(工作在几分钟内完成,而不是几小时/从不) 我将对持久性进行更多测试,并以解决方法的形式给出答案。
结果:
这似乎只能在表面上解决这些问题。实际上,我回到了原点并得到了 OOM 异常java.lang.OutOfMemoryError: GC overhead limit exceeded
最佳答案
如果你有这样的代码:
var df = sc.parallelize(Seq(1)).toDF()
for(i<- 1 to 200000) {
val df_add = sc.parallelize(Seq(i)).toDF()
df = df.unionAll(df_add)
}
然后 df 之后将有 400000 个分区,这使得以下操作效率低下(因为每个分区有 1 个任务)。
尝试减少分区数量,例如200 在保留数据帧之前(使用例如 df.coalesce(200).write.saveAsTable(....)
)
关于scala - 递归数据框操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40736993/