apache-spark - 用户定义的函数要应用于 PySpark 中的 Window?

标签 apache-spark pyspark aggregate-functions user-defined-functions window-functions

我正在尝试将用户定义的函数应用于 PySpark 中的 Window。我已经读过 UDAF 可能是要走的路,但我找不到任何具体的东西。

举个例子(取自这里:Xinh's Tech Blog 并针对 PySpark 进行了修改):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])

customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
                                    ["Alice", "2016-05-03", 45.00],
                                    ["Alice", "2016-05-04", 55.00],
                                    ["Bob", "2016-05-01", 25.00],
                                    ["Bob", "2016-05-04", 29.00],
                                    ["Bob", "2016-05-06", 27.00]],
                               ["name", "date", "amountSpent"])

customers.show()

window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))

result.show()

我正在申请 avg已内置于 pyspark.sql.functions , 但如果不是 avg我想使用更复杂的东西并编写自己的函数,我该怎么做?

最佳答案

Spark >= 3.0 :

SPARK-24561 - 带有 Pandas udf(有界窗口)的用户定义窗口函数正在进行中。详情请关注相关JIRA。

Spark >= 2.4 :

SPARK-22239 - 带有 Pandas udf(无界窗口)的用户定义窗口函数引入了对基于 Pandas 的带有无界窗口的窗口函数的支持。一般结构是

return_type: DataType

@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
  return ... 

w = (Window
    .partitionBy(grouping_column)
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn('foo', f('bar').over(w))

有关详细示例,请参阅 doctestsunit tests

Spark < 2.4

你不能。窗口函数需要 UserDefinedAggregateFunction 或等效对象,而不是 UserDefinedFunction ,并且不可能在 PySpark 中定义一个。

但是,在 PySpark 2.3 或更高版本中,您可以定义矢量化 pandas_udf ,它可以应用于分组数据。您可以找到一个工作示例 Applying UDFs on GroupedData in PySpark (with functioning python example) 。虽然 Pandas 不提供窗口函数的直接等价物,但有足够的表现力来实现任何类似窗口的逻辑,尤其是 pandas.DataFrame.rolling 。此外,与 GroupedData.apply 一起使用的函数可以返回任意数量的行。

您还可以从 PySpark Spark: How to map Python with Scala or Java User Defined Functions? 调用 Scala UDAF。

关于apache-spark - 用户定义的函数要应用于 PySpark 中的 Window?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48292077/

相关文章:

mysql - 组数

python - 使用 Python 的 reduce() 连接多个 PySpark DataFrame

mysql select sql 很慢,哪个索引丢了

postgresql - Spark 找不到 postgres jdbc 驱动程序

mysql - 在 shell 脚本中运行 spark-shell 命令

python - 如何使用 pyspark 收集两个连续日期之间的新 ID 列表

pandas - pyspark中的java.lang.OutOfMemoryError

mysql - 如何从使用用户变量作为计数器的查询中获得正确的结果?

java - Spark : print dataframe in Java

python - 在 PySpark 中应用自定义函数时使用外部模块