python - 如何在pyspark中使用pandas UDF并在StructType中返回结果

标签 python pandas pyspark

如何在 pyspark 中驱动基于 panda-udf 的列。我写的udf如下:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("in_type string, in_var string, in_numer int", PandasUDFType.GROUPED_MAP)

def getSplitOP(in_data):
    if in_data is None or len(in_data) < 1:
        return None
    #Input/variable.12-2017
    splt=in_data.split("/",1)
    in_type=splt[0]

    splt_1=splt[1].split(".",1)
    in_var = splt_1[0]

    splt_2=splt_1[1].split("-",1)
    in_numer=int(splt_2[0])

    return (in_type, in_var, in_numer)
    #Expected output: ("input", "variable", 12)

df = df.withColumn("splt_col", getSplitOP(df.In_data))

有人可以帮我找出上面的代码有什么问题,以及为什么它不起作用。

最佳答案

这会起作用:

df = spark.createDataFrame([("input/variable.12-2017",), ("output/invariable.11-2018",)], ("in_data",))
df.show()

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("in_type string, in_var string, in_numer int", PandasUDFType.GROUPED_MAP)
def getSplitOP(pdf):
    in_data = pdf.in_data

    #Input/variable.12-2017
    splt = in_data.apply(lambda x: x.split("/",1))
    in_type = splt.apply(lambda x: x[0])

    splt_1 = splt.apply(lambda x: x[1].split(".",1))
    in_var = splt_1.apply(lambda x: x[0])

    splt_2 = splt_1.apply(lambda x: x[1].split("-",1))
    in_numer = splt_2.apply(lambda x: int(x[0]))

    return pd.DataFrame({"in_type": in_type, "in_var": in_var, "in_numer": in_numer})
    #Expected output: ("input", "variable", 12)

df = df.groupBy().apply(getSplitOP)
df.show()
  • @pandas_udf 后面不能有空行。
  • pandas Series 对象不直接支持 split 等字符串函数。使用 apply 对每个系列进行元素操作。
  • 您使用了 GROUPED_MAP 来返回多列,但您的代码本质上并不按任何内容进行分组。请注意,此处使用的 groupBy 不带任何参数。这需要所有数据都适合单个处理器。

关于python - 如何在pyspark中使用pandas UDF并在StructType中返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54831667/

相关文章:

python - 在 pythoncurses 中获取更新的屏幕尺寸

python - 如何在PyQt中实现一个简单的按钮

python递归内存不足

python - 大数的累积二项分布

python - Pandas 数据帧 : Can I fetch other column values along with the column on which group by clause has been applied?

python - Apache Spark 读取 CSV 文件 - ClassNotFoundException

python - 将数据帧列表附加到python中的数据帧列表

python - 如何在Python中对数据框中的分类变量(系列)进行编码?

pyspark - 获取 Spark DataFrame 中两个日期之间的所有日期

ssl - 使用 Pyspark 通过 SSL 连接到 DB2