scala - Apache Spark : Transforming large DataFrame efficiently

标签 scala apache-spark apache-spark-sql

我有一个 1GB 的 csv 文件,我使用 DataFrame API 加载该文件。我还实现了一个自定义 Transformer 来准备数据,以便可以由 Estimator 进行处理。

transform 方法正在执行一些不同的操作:

  • 类型转换柱。
  • 过滤行。
  • 删除列。
  • 创建新列,对其他列应用函数。

我担心此过程中的内存使用情况。如果在每次转换后将结果存储在变量中会发生什么?例如(简化):

override def transform(dataset: Dataset[_]): DataFrame = {

    val df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))

    val df2 = df1.filter($"Diverted" === 0)

    val df3 = df2.drop(forbiddenVariables: _*)

    val df4 = df3.withColumn("DepHour", hourExtractorUdf($"DepTime"))

    val df5 = df4.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))

    df5

}

假设我这样做是为了在一个转换与另一个转换之间进行记录。

好的。第二个选择。如果我使用 var 而不是 val 会怎样?

override def transform(dataset: Dataset[_]): DataFrame = {

    var df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))

    df = df.filter($"Diverted" === 0)

    df = df.drop(forbiddenVariables: _*)

    df = df.withColumn("DepHour", hourExtractorUdf($"DepTime"))

    df = df.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))

    df

}

我想现在在整个过程中我没有在内存中加载 5 个 DataFrame。对吗?

最后,下一个选项怎么样,它是否更节省内存?

override def transform(dataset: Dataset[_]): DataFrame = {

    dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
      .filter($"Diverted" === 0)
      .drop(forbiddenVariables: _*)
      .withColumn("DepHour", hourExtractorUdf($"DepTime"))
      .select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))

}

当然,我假设没有任何选项比其他选项的计算成本更高。

最佳答案

代码的所有版本都是等效的,因为它们最终会生成相同的数据帧并且不会产生任何副作用。对于 Spark 的工作原理似乎存在一些根本性的误解。数据帧不包含数据。它们只是一个执行计划。

在学习 Spark 时,我们经常讨论“转换”和“ Action ”之间的区别。

转换会修改数据,例如 filterselectdrop 以及修改数据帧的任何其他方法。 “转变”做零工作,他们只是建立执行计划。

另一方面,操作实际上会产生一些可见的效果。这些操作包括保存到文件、将结果收集到驱动程序或使用 foreach 消耗数据。仅当调用操作时,才会评估您的数据框并运行转换。

1GB 的数据也很小,如果这就是你的全部,也许你需要重新考虑是否真的需要使用 Spark。

关于scala - Apache Spark : Transforming large DataFrame efficiently,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47556542/

相关文章:

scala - 为什么在调用 timer.schedule 时主应用程序不退出

scala - Scala:隐式转换不起作用

scala - 字段 "features"不存在。 Spark ML

python - 在 spark(python)中通过 MapReduce 理解分组

python - 在 UDF 的 withColumn 之后,运行 count() 给出 TypeError : 'NoneType' object is not subscriptable

json - spark读取json中的重复列

javascript - 在 Java 中访问 JavaScript 对象的字段

pandas - 将 spark DataFrame 转换为 pandas DF

scala - Spark : join method with generic RDD

python - pyspark 在将 rdd 转换为数据帧时对 mapPartitions 使用一个任务