This answer很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合。但是,我不可能像示例的这一部分所示那样手动声明我的架构
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
因为我将返回 100 多个具有自动生成的名称的列。有什么方法可以告诉 PySpark 隐含地使用我的函数返回的架构并假设所有工作节点都相同?该架构也会在运行期间发生变化,因为我将不得不尝试使用我想要使用的预测变量,因此架构生成的自动化过程可能是一个选项...
最佳答案
基于 Sanxofons comment,我对如何自己实现这个有了一个想法:
from pyspark.sql.types import *
mapping = {"float64": DoubleType,
"object":StringType,
"int64":IntegerType} # Incomplete - extend with your types.
def createUDFSchemaFromPandas(dfp):
column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema
我所做的是获取样本 pandas df,将其传递给函数,然后查看返回的内容:
dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)
这似乎对我有用。问题是它有点递归(需要定义函数来获取模式,将模式定义为 udf)。我通过创建一个只传递数据帧的“包装器”UDF 解决了这个问题。
关于python - PySpark 中 pandas_udf 的隐式模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54770485/