scala - Spark,在 DataFrame(或 RDD)上多次应用过滤器,无需冗余评估

标签 scala apache-spark lazy-evaluation

我有一个 Spark DataFrame,需要对父 RDD 的链接进行大量评估。

val df: DataFrame[(String, Any)] = someMethodCalculatingDF()
val out1 = df.filter(_._1 == "Key1").map(_._2).collect()
val out2 = df.filter(_._1 == "Key2").map(_._2)

out1 是一个非常小的数据(每个分区中一到两行)并被收集以供进一步使用。 out2 是一个 Dataframe,将用于生成另一个稍后将具体化的 RDD。 因此,df 将被评估两次,这是很重的。

缓存可能是一个解决方案,但在我的应用程序中,它不会,因为数据可能非常非常大。内存就会溢出。

有没有天才:)谁可以提出另一种绕过冗余评估的方法?

最佳答案

这实际上是我们集群中每天都会发生的场景。根据我们的经验,这种方法最适合我们。

当我们需要两次使用相同的计算数据帧(在不同的分支上)时,我们执行以下操作:

  1. 计算阶段很繁重,导致数据帧相当小 -> 缓存它。

  2. 计算阶段很轻,导致数据帧很大 -> 让它计算两次。

  3. 计算量很大,导致数据帧很大 -> 将其写入磁盘(HDFS 或 S3),将分割点上的作业分割为两个不同的批处理作业。在这种情况下,您不会重复繁重的计算,也不会粉碎您的缓存(这两种方式都可能会使用磁盘)。

  4. 计算阶段很轻,导致数据帧很小。你的生活很好,你可以回家了:)。

关于scala - Spark,在 DataFrame(或 RDD)上多次应用过滤器,无需冗余评估,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58869344/

相关文章:

python - Python中的惰性求值

arrays - 如何对 Spark 数据帧中嵌套数组中结构的值求和?

gradle - 在Gradle中,如何执行对延迟评估的属性(在扩展名上)的验证?

scala - DenseVector 卷积,如何进行。(Scala Breeze)

选项上的 Scala 模式匹配

java - Apache Spark : Effectively using mapPartitions in Java

apache-spark - Hadoop 与 Spark 澄清

r - 惰性计算 : Why can't I use plot(. .., xlim = c(0,1), ylim = xlim)?

scala - 如何处理 Scala 中的类型名称冲突?

python - Scala ctrl c 不离开 REPL