我写了一个UDF。它非常慢。我想用 pandas_udf 替换它以利用矢量化。
实际的 udf 有点复杂,但我创建了一个简化的玩具版本。
我的问题:是否可以将玩具示例中的 UDF 替换为可以利用矢量化的 pandas_udf?如果不是,为什么不呢?
P.S:我知道我可以在没有 UDF 的情况下达到同样的效果。那是因为我简化了示例,但这不是我的目标。
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
'Number': [[2, 1, 1], [3, 1, 1, 2]],
})
)
# The UDF I hope to replace with a pandas_udf
@f.udf(ArrayType(StringType()))
def array_func(le, nr):
res=[]
for i in range(len(nr)):
if nr[i]==1:
res.append(le[i])
else:
res.append('Nope')
return res
# Applying the udf
df = df.withColumn('udf', array_func('Letter','Number'))
df.show()
最佳答案
这个怎么样?
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
'Number': [[2, 1, 1], [3, 1, 1, 2]],
})
)
df.show()
# Add a dummy column so you can use groupby
df = df.withColumn('id', F.lit(1))
schm = StructType(df.schema.fields + [StructField('udf', ArrayType(StringType()), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def array_udf(pdf):
res=[]
for ls, ns in zip(pdf['Letter'], pdf['Number']):
r = [l if n == 1 else 'Nope' for l, n in zip(ls, ns)]
res.append(r)
pdf['udf'] = res
return pdf
df = df.groupby('id').apply(array_udf).drop('id')
df.show()
输出:
+------------+------------+------------------+
| Letter| Number| udf|
+------------+------------+------------------+
| [A, A, C]| [2, 1, 1]| [Nope, A, C]|
|[A, C, A, D]|[3, 1, 1, 2]|[Nope, C, A, Nope]|
+------------+------------+------------------+
关于arrays - pandas_udf 对两个 ArrayType(StringType()) 字段进行操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57807336/