python - 根据列值是否在另一列中将列添加到 PySpark DataFrame

标签 python apache-spark pyspark apache-spark-sql

我有一个 PySpark DataFrame,其结构由

[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')

我需要添加一个包含 1 或 0 的列,具体取决于“项目”是否在“fav_items”中。

所以我想

[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]

我如何查找第二列到第三列以确定值,然后如何添加它?

最佳答案

以下代码执行请求的任务。定义了一个用户定义的函数,它接收 DataFrame 的两列作为参数。因此,对于每一行,搜索项目是否在项目列表中。如果找到该项目,则返回 1,否则返回 0。

# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df.show()

# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())

df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()

结果如下:

+----+----+---------+
|user|item|fav_items|
+----+----+---------+
|  u1|   1|[1, 2, 3]|
|  u1|   4|[1, 2, 3]|
+----+----+---------+

+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
|  u1|   1|[1, 2, 3]|     1|
|  u1|   4|[1, 2, 3]|     0|
+----+----+---------+------+

关于python - 根据列值是否在另一列中将列添加到 PySpark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35898687/

相关文章:

python - Pyspark - 对多个稀疏向量求和(CountVectorizer 输出)

python - 通过过滤对 Pyspark Dataframe 进行分组

python - Django 2.1.7,完整性错误,外键约束失败

python - Flask-SqlAlchemy View 反射

python - 如何在 PySpark 的 UDF 中返回 "Tuple type"?

string - 将字符串表达式转换为实际工作实例表达式

python - 使用 pymongo 查询 mongodb

python - 如何关闭 Flask 中的服务器发送事件连接?

sql - 如何使用 Spark Scala 或 sql 对特定时间间隔内的记录进行分组?

apache-spark - RDD.foreach() 和 RDD.map() 之间的区别