Spark Dataframes 有一个方法 withColumn
一次添加一个新列。要添加多列,一串withColumn
s 是必需的。这是执行此操作的最佳做法吗?
我觉得用mapPartitions
有更多的优势。假设我有一个由三个组成的链 withColumn
s 然后删除一个过滤器 Row
s 基于特定条件。这是四种不同的操作(不过,我不确定其中是否有任何一种是广泛的转换)。但是如果我做一个 mapPartitions
,我可以一次性完成所有工作.如果我有一个我希望每个 RDD 分区打开一次的数据库连接,它也会有所帮助。
我的问题有两个部分。
第一部分,这是我对 mapPartitions 的实现。这种方法有什么不可预见的问题吗?有没有更优雅的方法来做到这一点?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,使用
mapPartitions
的权衡是什么?在一连串的 withColumn
上和 filter
?我在某处读到使用 Spark DF 的可用方法总是比推出您自己的实现更好。如果我的论点有误,请告诉我。谢谢!欢迎所有想法。
最佳答案
Are there any unforeseen issues with this approach?
多种的。最严重的影响是:
DataFrame
相比,内存占用高出几倍代码和大量的垃圾收集开销。 toDF
上的模式推断成本调用(如果提供了适当的架构,可以避免)和可能重新执行所有前面的步骤。 其中一些可以通过
udf
避免和 select
/withColumn
,其他不能。let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
您的
mapPartitions
不删除任何操作,也不提供任何优化,Spark planner 不能排除。它唯一的优点是它为昂贵的连接对象提供了一个很好的范围。I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation
当您开始使用执行器端 Python 逻辑时,您已经与 Spark SQL 不同了。如果你使用
udf
没关系, RDD
或新添加的矢量化 udf。在一天结束时,您应该根据代码的整体结构做出决定 - 如果主要是直接在数据上执行的 Python 逻辑,最好坚持使用 RDD
或完全跳过 Spark。如果这只是逻辑的一小部分,并且不会导致严重的性能问题,请不要担心。
关于apache-spark - PySpark:向 DataFrame 添加更多列的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49651049/