我已经阅读了很多关于如何在 pyspark 中进行高效连接的文章。我发现的实现高效连接的方法基本上是:
最后一个是我宁愿尝试的,但我找不到在 pyspark 中做到这一点的方法。我试过了:
df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
但这无济于事,直到我停止它仍然需要很长时间,因为 spark get 卡在最后几项工作中。
那么,我如何在 pyspark 中使用相同的分区器并加速我的连接,甚至摆脱永远需要的洗牌?我需要使用哪个代码?
PD : 我查看过其他文章,甚至在 stackoverflow 上,但我还是看不到代码。
最佳答案
您也可以使用两遍方法,以防它适合您的要求。首先,重新分区数据并使用分区表 (dataframe.write.partitionBy()) 进行持久化。然后,在循环中串行连接子分区,“附加”到同一个最终结果表。
Sim很好地解释了这一点。见下面的链接
two pass approach to join big dataframes in pyspark
基于上面解释的案例,我能够在循环中串行连接子分区,然后将连接的数据持久化到配置单元表。
这是代码。
from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")
因此,如果您加入一个整数 emp_id,您可以通过 ID 取模某个数字进行分区,这样您就可以在 spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一分区上。
然后,您可以读取并循环遍历每个子分区数据,并将两个数据帧连接起来并将它们保存在一起。
counter =0;
paritioncount = 4;
while counter<=paritioncount:
query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
EMP_DF1 =spark.sql(query1)
EMP_DF2 =spark.sql(query2)
df1 = EMP_DF1.alias('df1')
df2 = EMP_DF2.alias('df2')
innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
innerjoin_EMP.show()
innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
counter = counter +1
我试过这个,这工作正常。这只是演示两遍方法的示例。您的加入条件可能会有所不同,分区数量也取决于您的数据大小。
关于apache-spark - 高效的pyspark连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53524062/