apache-spark - 如何使用UDF添加多列?

标签 apache-spark pyspark apache-spark-sql

问题

我想将 UDF 的返回值添加到单独列中的现有数据框。我如何以一种足智多谋的方式实现这一目标?

这是我到目前为止所拥有的一个例子。

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  

df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)

+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+

def example(n):
        return [[n+2], [n-2]]

#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])

example_udf = udf(example)

现在我可以向数据框中添加一列,如下所示
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+

但是我不希望这两个值在同一列中,而是在不同的列中。

理想情况下,我想现在拆分输出列以避免调用示例函数两次(每个返回值一次),如 here 所述。和 here ,但是在我的情况下,我得到了一个数组数组,我看不到拆分在那里是如何工作的(请注意,每个数组将包含多个值,用“,”分隔。

结果应该如何

我最终想要的是这个
+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+

请注意, StructType 返回类型的使用是可选的,不一定是解决方案的一部分。

编辑:我注释掉了 StructType 的使用(并编辑了 udf 赋值),因为它不是示例函数的返回类型所必需的。但是,如果返回值类似于
return [6,3,2],[4,3,1]

最佳答案

返回 StructType ,只需使用 Row

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", IntegerType(), False),
    StructField("Out2", IntegerType(), False)])

example_udf = f.UserDefinedFunction(example, schema)

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)

关于apache-spark - 如何使用UDF添加多列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47669895/

相关文章:

apache-spark - 使用向量汇编器并在spark scala中提取 "features"作为org.apache.spark.mllib.linalg.Vectors

apache-spark - 如何在 Google Dataproc 中发送失败作业的警报通知?

python - 我们可以动态检索 pyspark 数据框中更新列的前一行值吗

python - 如何在 python 中使用 `map` 将 dict 值转换为整数?

python - Spark 从 IBM Informix 数据库读取数据 "Not enough tokens are specified in the string representation of a date value"

scala - Spark : Parse a Date/Timestamps with different Formats (MM-dd-yyyy HH:mm, MM/dd/yy H:mm ) 在 Dataframe 的同一列

apache-spark - 基于 Salt 安装 Spark 集群的快速指南

apache-spark - 如何将 Spark 实时流与另一个流在其整个生命周期中收集的所有数据一起加入?

python - 从数据库中为每一行 DataFrame Pyspark 获取数据

python - Spark Dataframes 计算每个句子中的单词数