python-3.x - PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)

标签 python-3.x regex scala pyspark user-defined-functions

我正在尝试优化下面的代码(PySpark UDF)。
它给了我想要的结果(基于我的数据集),但是在非常大的数据集(大约 180M)上它太慢了。
结果(准确性)优于可用的 Python 模块(例如 geotext、hdx-python-country)。所以我不是在寻找另一个模块。
数据框:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")
正则表达式.csv:
iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>
regex.csv 创建 Pandas DataFrame , 按 iso2 分组并加入keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$)。
df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()
功能:
def get_iso2(x): 
 
    iso2={}
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]
PySpark UDF:
get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))
创建新列:
df_new = df.withColumn('iso2',get_iso2_udf('address')
预期样本输出:
[US,US,NL]
[CA]
[BE,BE,AU]
有些地方出现在多个国家(输入是带有城市、省、州、国家...的地址列)
样本:
3030 Whispering Pines Circle, Prosper Texas, 美国 -> [US,US,US]Kalverstraat 阿姆斯特丹 -> [US,NL]Kalverstraat 阿姆斯特丹,荷兰 -> [US, NL, NL]也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何。
非常感谢您的优化建议!

最佳答案

IIUC,您可以在不使用 UDF 的情况下尝试以下步骤:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")
第一步:将 df_regex 转换为 Spark 数据帧 df1并将唯一标识添加到 df .
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+
第二步:使用 rlike 将 df_regex 左连接到 df
df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+
第三步:匹配的计数d2.keywordsd1.address通过拆分 d1.address通过 d2.keywords ,然后将生成的 Array 的大小减小 1:
df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+
第四步:使用 array_repeat重复 iso2 的值num_matches次(需要 Spark 2.4+ ):
df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+
第五步: groupby 并进行聚合:
df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+
备选:步骤 3 也可以由 Pandas UDF 处理:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+
备注:
  • 由于不区分大小写的模式匹配成本很高,我们将关键字的所有字符(除了 anchor 或转义字符,如 \b\B\A\z )转换为大写。
  • 只是提醒一下,rlike 中使用的模式和 regexp_replace是基于 Java 的,而在 pandas_udf 它是基于 Python 的,在 regex.csv 中设置模式时可能会略有不同。

  • 方法二:使用 pandas_udf
    由于使用 join 和 groupby 会触发数据混洗,上述方法可能会很慢。为您的测试提供了另一种选择:
    df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
    
    df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
    
    df_ptn = spark.sparkContext.broadcast(
        df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
    )
    df_ptn.value
    #{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
    # 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
    # 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}
    
    # REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
    from operator import iconcat
    from functools import reduce
    from pandas import Series
    from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten
    
    def __get_iso2(addr, ptn):   
       return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
    
    get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
    
    df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
    +--------------------+----+---+------------+
    |             address|name| id|        iso2|
    +--------------------+----+---+------------+
    |3030 Whispering P...|John|  0|[US, US, US]|
    |Kalverstraat Amst...|Mary|  1|    [NL, US]|
    |Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
    |                xvcv| ddd|  3|          []|
    +--------------------+----+---+------------+
    
    或返回 pandas_udf 中的数组数组(不带 reduceiconcat )并执行 flatten与 Spark :
    def __get_iso2_2(addr, ptn):
        return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])
    
    get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)
    
    df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()
    
    更新:要查找独特的国家/地区,请执行以下操作:
    def __get_iso2_3(addr, ptn):
      return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
    
    get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
    
    df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
    +--------------------+----+--------+
    |             address|name|    iso2|
    +--------------------+----+--------+
    |3030 Whispering P...|John|    [US]|
    |Kalverstraat Amst...|Mary|[NL, US]|
    |Kalverstraat Amst...| Lex|[NL, US]|
    |                xvcv| ddd|      []|
    +--------------------+----+--------+
    
    方法 3:使用列表推导:
    类似于 @CronosNull 的 方法,如果 regex.csv 的列表是可管理的,您可以使用列表理解来处理它:
    from pyspark.sql.functions import size, split, upper, col, array, expr, flatten
    
    df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
    df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
    df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
    
    df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])
    
    df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
    +--------------------+----+---+------------+
    |             address|name| id|        iso2|
    +--------------------+----+---+------------+
    |3030 Whispering P...|John|  0|[US, US, US]|
    |Kalverstraat Amst...|Mary|  1|    [NL, US]|
    |Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
    |                xvcv| ddd|  3|          []|
    +--------------------+----+---+------------+
    

    关于python-3.x - PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63369936/

    相关文章:

    c# - 使用正则表达式匹配两个特定单词之间的所有内容

    正则表达式无法识别带有特殊字符的电子邮件?

    javascript - 为什么这个正则表达式在 Javascript 中不起作用?

    scala - 如何取消Spark DataFrame列

    python - 在 python 中格式化对象列表

    python - 如何在 Python 中习惯性地对嵌套字典键进行排序

    python-3.x - Python,列表变量和字符串变量的作用域实际上不同吗?

    Scala 选项 - 摆脱 if (opt.isDefined) {}

    Scala:尾递归和 ListBuffer

    arrays - Numpy 迭代和追加