apache-spark - Spark 代码组织和最佳实践

标签 apache-spark functional-programming code-organization

关闭。这个问题是opinion-based .它目前不接受答案。












想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题.

3年前关闭。




Improve this question




因此,在面向对象的世界中度过了多年并始终考虑代码重用、设计模式和最佳实践,我发现自己在 Spark 世界中的代码组织和代码重用方面有些挣扎。

如果我尝试以可重用的方式编写代码,它几乎总是伴随着性能成本,我最终将其重写为最适合我的特定用例的任何代码。这种“为这个特定用例编写最佳内容”的常量也会影响代码组织,因为当“它们真的属于一起”时,将代码拆分为不同的对象或模块是很困难的,因此我最终得到的“上帝”对象很少包含长复杂的转换链。事实上,我经常认为,如果我在面向对象的世界中工作时查看了我现在编写的大部分 Spark 代码,我会畏缩并认为它是“意大利面条式代码”。

我在网上冲浪,试图找到某种与面向对象世界的最佳实践等效的方法,但运气不佳。我可以找到一些函数式编程的“最佳实践”,但 Spark 只是增加了一个额外的层,因为性能在这里是一个主要因素。

所以我要问你的问题是,你们中的任何一位 Spark 大师是否找到了一些可以推荐的编写 Spark 代码的最佳实践?

编辑

正如评论中所写,我实际上并不期望有人发布有关如何解决此问题的答案,而是希望这个社区中的某个人遇到了某种 Martin Fowler 类型的人,他在某处写过一些文章或博客文章关于如何解决 Spark 世界中的代码组织问题。

@DanielDarabos 建议我可以举一个代码组织和性能发生冲突的情况的例子。虽然我发现我在日常工作中经常遇到这个问题,但我发现将其归结为一个很好的最小示例有点困难;)但我会尝试。

在面向对象的世界里,我是单一职责原则的忠实粉丝,所以我会确保我的方法只负责一件事。它使它们可重复使用且易于测试。因此,如果我必须,例如,计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两种方法 - 一种计算总和,另一种计算总和计算平均值。像这样:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

我当然可以继续在 Spark 中尊重 SRP:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

但是因为我的df可能包含数十亿行我宁愿不必执行 filter两次。事实上,性能与 EMR 成本直接相关,所以我真的不想要那样。为了克服它,我因此决定违反 SRP 并简单地将两个函数合二为一,并确保我在国家过滤的 DataFrame 上调用 persist , 像这样:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

现在,这个例子当然是对现实生活中遇到的事情的极大简化。在这里我可以简单地通过过滤和持久化来解决它 df在将它交给 sum 和 avg 函数之前(这也将是更多的 SRP),但在现实生活中可能会有一些中间计算需要一次又一次地进行。换句话说,filter这里的函数只是试图为一些可以从持久化中受益的东西做一个简单的例子。事实上,我认为调用 persist是这里的关键字。调用 persist将大大加快我的工作速度,但代价是我必须将所有依赖于持久化 DataFrame 的代码紧密结合起来。 - 即使它们在逻辑上是分开的。

最佳答案

关于apache-spark - Spark 代码组织和最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32777014/

相关文章:

Scala 类型检查编译错误

scala - 它如何是自然的转变?

code-organization - 组织科学数据和代码——实验、模型、模拟、实现

c++ - 如何在包含的 C++ 源代码中写入外部文件的路径

database - 按应用程序主题组织 SQL 文件值得头疼吗?

python - 如何在 PySpark groupByKey() 中对迭代器中的值求和

scala - 如何序列化elastic4s ElasticSearch客户端以与Spark RDD一起运行?

functional-programming - OCaml 中的模式匹配函数

scala - Spark 指数移动平均线

scala - 最简单程序的大型任务