我正在尝试将用户定义的函数应用于 PySpark 中的 Window。我已经读过 UDAF 可能是要走的路,但我找不到任何具体的东西。
举个例子(取自这里:Xinh's Tech Blog 并针对 PySpark 进行了修改):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
我正在申请
avg
已内置于 pyspark.sql.functions
, 但如果不是 avg
我想使用更复杂的东西并编写自己的函数,我该怎么做?
最佳答案
Spark >= 3.0 :
SPARK-24561 - 带有 Pandas udf(有界窗口)的用户定义窗口函数正在进行中。详情请关注相关JIRA。
Spark >= 2.4 :
SPARK-22239 - 带有 Pandas udf(无界窗口)的用户定义窗口函数引入了对基于 Pandas 的带有无界窗口的窗口函数的支持。一般结构是
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
有关详细示例,请参阅 doctests 和 unit tests。
Spark < 2.4
你不能。窗口函数需要
UserDefinedAggregateFunction
或等效对象,而不是 UserDefinedFunction
,并且不可能在 PySpark 中定义一个。但是,在 PySpark 2.3 或更高版本中,您可以定义矢量化
pandas_udf
,它可以应用于分组数据。您可以找到一个工作示例 Applying UDFs on GroupedData in PySpark (with functioning python example) 。虽然 Pandas 不提供窗口函数的直接等价物,但有足够的表现力来实现任何类似窗口的逻辑,尤其是 pandas.DataFrame.rolling
。此外,与 GroupedData.apply
一起使用的函数可以返回任意数量的行。您还可以从 PySpark Spark: How to map Python with Scala or Java User Defined Functions? 调用 Scala UDAF。
关于apache-spark - 用户定义的函数要应用于 PySpark 中的 Window?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48292077/