我有一个 1GB 的 csv 文件,我使用 DataFrame API 加载该文件。我还实现了一个自定义 Transformer
来准备数据,以便可以由 Estimator
进行处理。
transform
方法正在执行一些不同的操作:
- 类型转换柱。
- 过滤行。
- 删除列。
- 创建新列,对其他列应用函数。
我担心此过程中的内存使用情况。如果在每次转换后将结果存储在变量中会发生什么?例如(简化):
override def transform(dataset: Dataset[_]): DataFrame = {
val df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
val df2 = df1.filter($"Diverted" === 0)
val df3 = df2.drop(forbiddenVariables: _*)
val df4 = df3.withColumn("DepHour", hourExtractorUdf($"DepTime"))
val df5 = df4.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
df5
}
假设我这样做是为了在一个转换与另一个转换之间进行记录。
好的。第二个选择。如果我使用 var
而不是 val
会怎样?
override def transform(dataset: Dataset[_]): DataFrame = {
var df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
df = df.filter($"Diverted" === 0)
df = df.drop(forbiddenVariables: _*)
df = df.withColumn("DepHour", hourExtractorUdf($"DepTime"))
df = df.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
df
}
我想现在在整个过程中我没有在内存中加载 5 个 DataFrame。对吗?
最后,下一个选项怎么样,它是否更节省内存?
override def transform(dataset: Dataset[_]): DataFrame = {
dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
.filter($"Diverted" === 0)
.drop(forbiddenVariables: _*)
.withColumn("DepHour", hourExtractorUdf($"DepTime"))
.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
}
当然,我假设没有任何选项比其他选项的计算成本更高。
最佳答案
代码的所有版本都是等效的,因为它们最终会生成相同的数据帧并且不会产生任何副作用。对于 Spark 的工作原理似乎存在一些根本性的误解。数据帧不包含数据。它们只是一个执行计划。
在学习 Spark 时,我们经常讨论“转换”和“ Action ”之间的区别。
转换会修改数据,例如 filter
、select
、drop
以及修改数据帧的任何其他方法。 “转变”做零工作,他们只是建立执行计划。
另一方面,操作实际上会产生一些可见的效果。这些操作包括保存到文件、将结果收集到驱动程序或使用 foreach 消耗数据。仅当调用操作时,才会评估您的数据框并运行转换。
1GB 的数据也很小,如果这就是你的全部,也许你需要重新考虑是否真的需要使用 Spark。
关于scala - Apache Spark : Transforming large DataFrame efficiently,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47556542/