python - Pyspark - 调用 pandas_udf 时出错,返回 Series.interpolate() 作为结果

标签 python pandas pyspark user-defined-functions

我正在尝试创建一个返回 interpolation 的 UDF函数,但该函数返回一个带有索引的系列并抛出异常。

from pyspark.sql.types import FloatType

@F.pandas_udf(FloatType(), F.PandasUDFType.GROUPED_AGG)
def udf_interpolate(v):
  return v.interpolate('linear')

## Test data
df = spark.createDataFrame([
    ("charles", 1),
    ("charles", None),
    ("charles", 3),
], ["name", "value"])

window = Window.partitionBy('name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('test_interp', udf_interpolate(df.value).over(window)).show()

错误信息:

pyarrow.lib.ArrowInvalid: Could not convert 0    3.0
1    2.0
2    1.0
Name: _0, dtype: float64 with type Series: tried to convert to float32

我尝试强制转换为 float32,但错误仍然存​​在。我最初的想法是因为我返回一个“预期一个值”中包含多个值的系列,但我不知道如何解决这个问题。

如果我更改函数,例如返回 v.mean(),效果很好。

感谢任何帮助。

谢谢。

最佳答案

GROUPED_AGG requires the UDF to return a scalar ;在您的情况下,最好使用GROUPED_MAP,因为您要返回一个系列并且需要按组执行计算;本质上,您将每个名称的子数据帧传递给 pandas_udf,使用 pandas API 对其进行转换,然后返回转换后的数据帧:

@F.pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def udf_interpolate(g):
    return g.assign(value=g.value.interpolate('linear'))

df.groupby('name').apply(udf_interpolate).show()
+-------+-----+                                                                 
|   name|value|
+-------+-----+
|charles|    1|
|charles|    2|
|charles|    3|
+-------+-----+

关于python - Pyspark - 调用 pandas_udf 时出错,返回 Series.interpolate() 作为结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54563222/

相关文章:

python - 在 YARN 上运行的 Spark 如何计算 Python 内存使用量?

python - 将类函数传递给 PySpark RDD

Python LDIF 解析器

python - 从范围在python中创建字典对象

apache-spark - GLM with Apache Spark 2.2.0 - Tweedie 系列默认链接值

python - Pandas Dataframe.to_csv - 将变量值插入到 csv 文件的开头

python - 在 Pandas 中使用滚动的滑动窗口迭代器

python - Django - 如何在发布表单时添加用户的 IP 地址

python - geodjango 检查 PolygonField 中的 PointField

python - 根据已过滤的 pandas 数据帧绘制定义的 x/y 范围的 pcolormesh,即使该行或列不存在于已过滤的数据帧中