python - PySpark 中 pandas_udf 的隐式模式?

标签 python apache-spark pyspark user-defined-functions

This answer很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合。但是,我不可能像示例的这一部分所示那样手动声明我的架构

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

因为我将返回 100 多个具有自动生成的名称的列。有什么方法可以告诉 PySpark 隐含地使用我的函数返回的架构并假设所有工作节点都相同?该架构也会在运行期间发生变化,因为我将不得不尝试使用我想要使用的预测变量,因此架构生成的自动化过程可能是一个选项...

最佳答案

基于 Sanxofons comment,我对如何自己实现这个有了一个想法:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

我所做的是获取样本 pandas df,将其传递给函数,然后查看返回的内容:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

这似乎对我有用。问题是它有点递归(需要定义函数来获取模式,将模式定义为 udf)。我通过创建一个只传递数据帧的“包装器”UDF 解决了这个问题。

关于python - PySpark 中 pandas_udf 的隐式模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54770485/

相关文章:

c# - 在 C# 或 Unity 中执行 Python 脚本

scala - 如何在 Spark 中转置 RDD

eclipse - 无法从Eclipse将Spark与Hortonworks Sandbox连接

scala - 试图让 Apache Spark 与 IntelliJ 一起工作

sql - pyspark.sql.utils.ParseException : "\nmismatched input" in PYSPARKSQL

apache-spark - 如何在 PySpark 中将 Pandas 的 DatetimeIndex 转换为 DataFrame?

python - 在groupby之后找到具有对应值的nlargest(2)

Python将简历图像存储在mongodb gridfs中

apache-spark - Spark fillNa 不替换空值

python - C++ 中是否有相当于 "for ... else"Python 循环的方法?