apache-spark - 高效的pyspark连接

标签 apache-spark pyspark

我已经阅读了很多关于如何在 pyspark 中进行高效连接的文章。我发现的实现高效连接的方法基本上是:

  • 如果可以,请使用广播连接。 ( 我通常不能 因为数据帧太大)
  • 考虑使用非常大的集群。 (我宁愿不是因为 $$$ )。
  • 使用相同的分区器。

  • 最后一个是我宁愿尝试的,但我找不到在 pyspark 中做到这一点的方法。我试过了:
    df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
    

    但这无济于事,直到我停止它仍然需要很长时间,因为 spark get 卡在最后几项工作中。

    那么,我如何在 pyspark 中使用相同的分区器并加速我的连接,甚至摆脱永远需要的洗牌?我需要使用哪个代码?

    PD : 我查看过其他文章,甚至在 stackoverflow 上,但我还是看不到代码。

    最佳答案

    您也可以使用两遍方法,以防它适合您的要求。首先,重新分区数据并使用分区表 (dataframe.write.partitionBy()) 进行持久化。然后,在循环中串行连接子分区,“附加”到同一个最终结果表。
    Sim很好地解释了这一点。见下面的链接

    two pass approach to join big dataframes in pyspark

    基于上面解释的案例,我能够在循环中串行连接子分区,然后将连接的数据持久化到配置单元表。

    这是代码。

    from pyspark.sql.functions import *
    emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
    emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")
    

    因此,如果您加入一个整数 emp_id,您可以通过 ID 取模某个数字进行分区,这样您就可以在 spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一分区上。
    然后,您可以读取并循环遍历每个子分区数据,并将两个数据帧连接起来并将它们保存在一起。
    counter =0;
    paritioncount = 4;
    while counter<=paritioncount:
        query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
        query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
        EMP_DF1 =spark.sql(query1)
        EMP_DF2 =spark.sql(query2)
        df1 = EMP_DF1.alias('df1')
        df2 = EMP_DF2.alias('df2')
        innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
        innerjoin_EMP.show()
        innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
        counter = counter +1
    

    我试过这个,这工作正常。这只是演示两遍方法的示例。您的加入条件可能会有所不同,分区数量也取决于您的数据大小。

    关于apache-spark - 高效的pyspark连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53524062/

    相关文章:

    r - 初始化 sparkR : JVM is not ready after 10 seconds 时出错

    azure - org.postgresql.util.PSQLException : SSL error: Received fatal alert: handshake_failure while writing from Azure Databricks to Azure Postgres Citus

    hadoop - EMR Hue : CUSTOM server authentication not supported. 有效的是 ['NONE' , 'KERBEROS' , 'PAM' , 'NOSASL' , 'LDAP' ]

    python - 使用 Python 将 Dask Dataframe 转换为 Spark Dataframe

    python - 如何在 PySpark 中只打印 DataFrame 的某一列?

    java - 根据java中的时间戳按月对spark数据集进行分组

    apache-spark - 每行计算并在 DataFrame PySpark 中添加新列 - 更好的解决方案?

    scala - 使用 Scala 类作为 UDF 与 pyspark

    python - 从 Databricks 笔记本向 Azure Eventhubs 发送 Spark 数据帧时出现错误 (java.lang.NoSuchMethodError)

    list - Pyspark 多个列表中每个元素的平均值