python - 我们如何在pyspark中使用dense_rank()函数?

标签 python apache-spark-sql pyspark

我正在运行 pyspark 脚本,在其中运行 sql 查询并创建数据帧。 在sql查询中有dense_rank()函数。由于此查询需要花费太多时间才能完全执行。

有什么方法可以快速执行查询,或者我们可以在 pyspark 级别处理这个问题吗? pyspark中是否有任何函数或方法可以替换sql中的dense_rank()?

SQL:

SELECT  DENSE_RANK() OVER(ORDER BY SOURCE_COLUMN_VALUE) AS SYSTEM_ID,SYSTEM_TABLE_NAME,SOURCE_ID,SOURCE_NAME,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME,SRC_VALUE AS SOURCE_COLUMN_VALUE,IM_INSERT_DT FROM (SELECT ID AS SOURCE_ID,'AMPIL' AS SOURCE_NAME,UPPER(concat(coalesce(addr_line_1,''),';',coalesce(addr_line_2,''),';',coalesce(city_1,''),';',coalesce(state_1,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,''))) as  SOURCE_COLUMN_VALUE,concat(coalesce(addr_line1_src,''),';',coalesce(addr_line2_src,''),';',coalesce(city_src,''),';',coalesce(state_crc,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,'')) as SRC_VALUE,SOURCE_TABLE_NAME,'ADDRESS' AS SYSTEM_TABLE_NAME,SOURCE_COLUMN_NAME,date_format(current_timestamp(),'yyyy-MM-dd hh:mm:ss') as IM_INSERT_DT from (SELECT ID,regexp_replace(addr_line_1,' ','') as addr_line_1,Upper(addr_line_1) as addr_line1_src,regexp_replace(addr_line_2,' ','') as addr_line_2 ,upper(addr_line_2) as addr_line2_src,regexp_replace(UPPER(coalesce(city,meli_city_nm)),' ','') as city_1,UPPER(coalesce(city,meli_city_nm)) as city_src,regexp_replace(coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)),' ','') as state_1, coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)) as state_crc,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then 'USA' when UPPER(coalesce(vw_states_code.country_cd,country)) = 'CANADA' then 'CA' else regexp_replace(UPPER(coalesce(vw_states_code.country_cd,country)),' ','') end as cntry_1,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then regexp_extract(substr(trim(regexp_replace(zip,' ','')),0,5),'^[0-9]{5}$',0) else regexp_replace(zip,' ','') end as zip_1,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME from vw_addr_stg LEFT JOIN (select * from vw_dmn_meli_zip where MELI_LAST_LN = 'L') vw_dmn_meli  on vw_addr_stg.zip=vw_dmn_meli.meli_zip_cd_base LEFT JOIN vw_states_code on (coalesce(meli_stt_provncd,state) = vw_states_code.state_cd or vw_states_code.state_nm = vw_addr_stg.state) LEFT JOIN vw_country_codes on vw_country_codes.country_name = vw_addr_stg.country))

最佳答案

pyspark中,您可以结合使用Window函数和SQL函数来获得您想要的结果。我不熟悉 SQL,也没有测试过该解决方案,但类似的东西可能会对您有所帮助:

import pyspark.sql.Window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy("SOURCE_COLUMN_VALUE")
df.withColumn("SYSTEM_ID", dense_rank().over(w))

您可以找到dense_rank here的文档

关于python - 我们如何在pyspark中使用dense_rank()函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61136483/

相关文章:

apache-spark - 如何访问结构体数组中的值?

apache-spark - 检查 DataFrame 在 PySpark 中是否有记录

python - 无法将 numpy 与 Spark 一起使用

python - zope.interface 可以定义类的 __init__ 方法应该是什么样子吗?

python - 使用 base64 库中的 b32decode 进行 base32 解码时忽略填充异常

java - Spark 在 UDF Java 中获取 WrappedArray<WrappedArray<Double>> 的值

apache-spark - 为什么 df.limit 在 Pyspark 中不断变化?

python - 避免在终端中输入 "python"来打开 .py 脚本?

python - 如何解析分层的 XML 字符串

scala - 将每个 json 行转换为表