apache-spark - PySpark:向 DataFrame 添加更多列的最佳实践

标签 apache-spark pyspark apache-spark-sql pyspark-sql

Spark Dataframes 有一个方法 withColumn一次添加一个新列。要添加多列,一串withColumn s 是必需的。这是执行此操作的最佳做​​法吗?

我觉得用mapPartitions有更多的优势。假设我有一个由三个组成的链 withColumn s 然后删除一个过滤器 Row s 基于特定条件。这是四种不同的操作(不过,我不确定其中是否有任何一种是广泛的转换)。但是如果我做一个 mapPartitions,我可以一次性完成所有工作.如果我有一个我希望每个 RDD 分区打开一次的数据库连接,它也会有所帮助。

我的问题有两个部分。

第一部分,这是我对 mapPartitions 的实现。这种方法有什么不可预见的问题吗?有没有更优雅的方法来做到这一点?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

第二部分,使用 mapPartitions 的权衡是什么?在一连串的 withColumn 上和 filter ?

我在某处读到使用 Spark DF 的可用方法总是比推出您自己的实现更好。如果我的论点有误,请告诉我。谢谢!欢迎所有想法。

最佳答案

Are there any unforeseen issues with this approach?



多种的。最严重的影响是:
  • 与普通 DataFrame 相比,内存占用高出几倍代码和大量的垃圾收集开销。
  • 在执行上下文之间移动数据所需的序列化和反序列化成本很高。
  • 在查询计划器中引入断点。
  • 照原样,toDF 上的模式推断成本调用(如果提供了适当的架构,可以避免)和可能重新执行所有前面的步骤。
  • 等等...

  • 其中一些可以通过 udf 避免和 select/withColumn ,其他不能。

    let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions



    您的 mapPartitions不删除任何操作,也不提供任何优化,Spark planner 不能排除。它唯一的优点是它为昂贵的连接对象提供了一个很好的范围。

    I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation



    当您开始使用执行器端 Python 逻辑时,您已经与 Spark SQL 不同了。如果你使用 udf 没关系, RDD或新添加的矢量化 udf。在一天结束时,您应该根据代码的整体结构做出决定 - 如果主要是直接在数据上执行的 Python 逻辑,最好坚持使用 RDD或完全跳过 Spark。

    如果这只是逻辑的一小部分,并且不会导致严重的性能问题,请不要担心。

    关于apache-spark - PySpark:向 DataFrame 添加更多列的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49651049/

    相关文章:

    apache-spark - Spark SQL 配置

    macos - 在 Spark 和 Hadoop 之间共享数据(Mahout)

    java - Java 中的 Scala 方法重写

    python - 如何在 PySpark 中将字典转换为数据框?

    json - 如何在 spark 中写入有效的 json

    graph - 是否可以在大图中使用并行框架实现所有对最短路径算法?

    scala - 使用 UTF-8 编码在 Spark 中写入 CSV(德语字符)时出现问题

    pyspark - 如何在pyspark中获得每个PCA组件的解释方差

    python - PySpark 数据帧 : Change cell value based on min/max condition in another column

    java - Spark 从列中获取udf名称并执行