scala - 递归数据框操作

标签 scala hadoop apache-spark

在我的 spark 应用程序中,我想在循环中对数据帧执行操作并将结果写入 hdfs。

伪代码:

var df = emptyDataframe
for n = 1 to 200000{
  someDf=read(n)
  df = df.mergeWith(somedf)
}
df.writetohdfs

在上面的示例中,当“mergeWith”执行 unionAll 时,我得到了很好的结果。

但是,当我在“mergeWith”中进行(简单的)连接时,工作变得非常慢(> 1h,有 2 个执行器,每个执行器有 4 个内核)并且永远不会完成(工作自行中止)。

在我的场景中,我对仅包含 ~1mb 文本数据的文件进行了约 50 次迭代。

因为合并顺序对我来说很重要,我怀疑这是由于 DAG 生成导致的,导致整个事情在我存储数据的那一刻运行。

现在我正在尝试在合并的数据框架上使用 .persist,但这似乎也很慢。

编辑:

在作业运行时,我注意到(即使我进行了计数和 .persist)内存中的数据帧看起来不像静态数据帧。 它看起来像是一条通往它一直在进行的所有合并的串联路径,有效地线性减慢了工作速度。

我是否可以假设 var df 是造成这种情况的罪魁祸首?

spiraling out of controle

我对问题的分割:

dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....

当我期望 DF' A C 和 D 是对象时,以不同的方式激发事物并且不关心我是否坚持或重新分区。 Spark 看起来像这样:

dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....

更新2

为了摆脱持续不在 DF 上工作的问题,我可以在将 DF 转换为 RDD 并再次转换回来时“打破”沿袭。 这有一点开销,但可以接受(工作在几分钟内完成,而不是几小时/从不) 我将对持久性进行更多测试,并以解决方法的形式给出答案。

结果: 这似乎只能在表面上解决这些问题。实际上,我回到了原点并得到了 OOM 异常java.lang.OutOfMemoryError: GC overhead limit exceeded

最佳答案

如果你有这样的代码:

var df = sc.parallelize(Seq(1)).toDF()

for(i<- 1 to 200000) {
  val df_add = sc.parallelize(Seq(i)).toDF()
  df = df.unionAll(df_add)
}

然后 df 之后将有 400000 个分区,这使得以下操作效率低下(因为每个分区有 1 个任务)。

尝试减少分区数量,例如200 在保留数据帧之前(使用例如 df.coalesce(200).write.saveAsTable(....))

关于scala - 递归数据框操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40736993/

相关文章:

scala - 将 Scala 从 2.12 降级到 2.11.x

java - 如何在 Java 的 Spark Streaming 中解析复杂的 JSON 数据

java - Apache Spark (Java) 中列的自定义处理

apache-spark - 了解Spark Shuffle溢出

scala - 如何在 build.sbt 中使用环境变量?

json - Playframeworks json Writes implicit 需要显式类型,为什么?

generics - Scala 集合中的泛型类

hadoop - 是否需要有关 hive 中的交叉连接的建议以从具有1.6亿行的大表中获取通配符单词?

java - Apache Pig:轻量级Java客户端

hadoop - 如何让EMR先执行customer jar