我正在尝试优化下面的代码(PySpark UDF)。
它给了我想要的结果(基于我的数据集),但是在非常大的数据集(大约 180M)上它太慢了。
结果(准确性)优于可用的 Python 模块(例如 geotext、hdx-python-country)。所以我不是在寻找另一个模块。
数据框:
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"]
]).toDF("address","name")
正则表达式.csv:iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$
......<many, many more>
从 regex.csv
创建 Pandas DataFrame , 按 iso2
分组并加入keywords
(\bArizona\b|\bTexas\b\bFlorida\b|\bUS$
)。df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()
功能:def get_iso2(x):
iso2={}
for j, row in df_regex.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex, x)
for m in matches:
iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
return [key for key, value in iso2.items() for _ in range(value)]
PySpark UDF:get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))
创建新列:df_new = df.withColumn('iso2',get_iso2_udf('address')
预期样本输出:[US,US,NL]
[CA]
[BE,BE,AU]
有些地方出现在多个国家(输入是带有城市、省、州、国家...的地址列)样本:
3030 Whispering Pines Circle, Prosper Texas, 美国 ->
[US,US,US]
Kalverstraat 阿姆斯特丹 -> [US,NL]
Kalverstraat 阿姆斯特丹,荷兰 -> [US, NL, NL]
也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何。非常感谢您的优化建议!
最佳答案
IIUC,您可以在不使用 UDF 的情况下尝试以下步骤:
from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"],
["xvcv", "ddd"]
]).toDF("address","name")
第一步:将 df_regex 转换为 Spark 数据帧 df1
并将唯一标识添加到 df
.df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()
df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords |
+----+---------------------------------------------------------------------------------+
|CA |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$ |
|NL |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$ |
|US |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+
df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0 |
|Kalverstraat Amsterdam |Mary|1 |
|Kalverstraat Amsterdam, Netherlands |Lex |2 |
|xvcv |ddd |3 |
+-----------------------------------------------+----+---+
第二步:使用 rlike 将 df_regex 左连接到 dfdf2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
| address|name| id|iso2| keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...|
| xvcv| ddd| 3|null| null|
+--------------------+----+---+----+--------------------+
第三步:匹配的计数d2.keywords
在 d1.address
通过拆分 d1.address
通过 d2.keywords
,然后将生成的 Array 的大小减小 1:df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| -2|
+--------------------+----+---+----+--------------------+-----------+
第四步:使用 array_repeat重复 iso2
的值num_matches
次(需要 Spark 2.4+ ):df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
| address|name| id| iso2| keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John| 0|[US, US, US]|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| [NL]|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| [US]|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| [NL, NL]|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| [US]|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3| []| null| -2|
+--------------------+----+---+------------+--------------------+-----------+
第五步: groupby 并进行聚合:df_new = df4 \
.groupby('id') \
.agg(
first('address').alias('address'),
first('name').alias('name'),
flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id| address|name| countries|
+---+--------------------+----+------------+
| 0|3030 Whispering P...|John|[US, US, US]|
| 1|Kalverstraat Amst...|Mary| [NL, US]|
| 3| xvcv| ddd| []|
| 2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+
备选:步骤 3 也可以由 Pandas UDF 处理:from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re
@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])
df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| 0|
+--------------------+----+---+----+--------------------+-----------+
备注:\b
、 \B
、 \A
、 \z
)转换为大写。 rlike
中使用的模式和 regexp_replace
是基于 Java 的,而在 pandas_udf 它是基于 Python 的,在 regex.csv 中设置模式时可能会略有不同。 方法二:使用 pandas_udf
由于使用 join 和 groupby 会触发数据混洗,上述方法可能会很慢。为您的测试提供了另一种选择:
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = spark.sparkContext.broadcast(
df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}
# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten
def __get_iso2(addr, ptn):
return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
或返回 pandas_udf 中的数组数组(不带 reduce
和 iconcat
)并执行 flatten
与 Spark :def __get_iso2_2(addr, ptn):
return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])
get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)
df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()
更新:要查找独特的国家/地区,请执行以下操作:def __get_iso2_3(addr, ptn):
return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
| address|name| iso2|
+--------------------+----+--------+
|3030 Whispering P...|John| [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
| xvcv| ddd| []|
+--------------------+----+--------+
方法 3:使用列表推导:类似于 @CronosNull 的 方法,如果 regex.csv 的列表是可管理的,您可以使用列表理解来处理它:
from pyspark.sql.functions import size, split, upper, col, array, expr, flatten
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])
df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
关于python-3.x - PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63369936/