python - 如何迭代 pyspark.sql.Column?

标签 python pyspark

我有一个 pyspark DataFrame,我想获取一个特定的列并迭代它的值。例如:

userId    itemId
1         2
2         2
3         7
4         10

我通过 df.userId 获取 userId 列,并且我想为该列中的每个 userId 应用一个方法。我怎样才能做到这一点?

最佳答案

您的问题对于您要应用的功能类型不是很具体,因此我创建了一个示例,根据 itemId 的值添加项目描述。

首先让我们导入相关库并创建数据:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = spark.createDataFrame([(1,2),(2,2),(3,7),(4,10)], ['userId', 'itemId'])

其次,创建函数并将其转换为 PySpark 可以使用的 UDF 函数:

def item_description(itemId):
    items = {2  : "iPhone 8",
             7  : "Apple iMac",
             10 : "iPad"}
    return items[itemId]

item_description_udf = udf(item_description,StringType())

最后,为 ItemDescription 添加一个新列,并用 item_description_udf 函数返回的值填充它:

df = df.withColumn("ItemDescription",item_description_udf(df.itemId))    
df.show()

这给出了以下输出:

+------+------+---------------+
|userId|itemId|ItemDescription|
+------+------+---------------+
|     1|     2|       iPhone 8|
|     2|     2|       iPhone 8|
|     3|     7|     Apple iMac|
|     4|    10|           iPad|
+------+------+---------------+

关于python - 如何迭代 pyspark.sql.Column?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46560696/

相关文章:

python - kivy iOS部署错误

python - 什么决定了将使用哪个索引 `pip`?

python - 当我的 Python 应用程序部署到 GAE 上时,我如何知道时间戳?

python - 如何使用 Python 检查 Git Repo 是否有未提交的更改

group-by - pyspark:聚合列中最常见的值

python - 寻找列表中的下一个障碍

python-3.x - 如何在 AWS EMR 上设置 PYTHONHASHSEED

python - 环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON

python - 为什么 registerTempTable 从数据框中删除一些行?

apache-spark - Spark Standalone - Tmp 文件夹