python - 在 pyspark 中转换 ALS 的输入数据

标签 python pyspark apache-spark-mllib apache-spark-ml collaborative-filtering

我的推荐输入数据如下:

[(u'97990079', u'18_34', 2),
 (u'585853655', u'11_8', 1),
 (u'1398696913', u'6_20', 1),
 (u'612168869', u'7_16', 1),
 (u'2272846159', u'11_17', 2)]

格式为(user_id, item_id, score)

如果我没理解错的话,spark中的ALS在训练前一定要将user_id,item_id转化为整数?如果是这样,我现在能想到的唯一解决方案是使用字典并将每个 user_iditem_id 映射到像

这样的整数
dictionary for item_id : {'18_34': 1, '18_35':2, ...}
dictionary for user_id : {'97990079':1, '585853655':2, ...}

但我想知道是否有其他优雅的方式来做到这一点?谢谢!

最佳答案

处理此问题的一种方法是使用 ML 转换器。首先让我们将您的数据转换为 DataFrame:

ratings_df = sqlContext.createDataFrame([
    (u'97990079', u'18_34', 2), (u'585853655', u'11_8', 1),
    (u'1398696913', u'6_20', 1), (u'612168869', u'7_16', 1),
    (u'2272846159', u'11_17', 2)],
    ("user_id", "item_id_str", "rating"))

接下来我们需要一个StringIndexer

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="item_id_str", outputCol="item_id")

最后让我们使用索引器转换 DataFrame:

from pyspark.sql.functions import col

transformed = (indexer
    .fit(ratings_df)
    .transform(ratings_df)
    .withColumn("user_id", col("user_id").cast("integer"))
    .select("user_id", "item_id", "rating"))

并转换为RDD[Rating]:

from pyspark.mllib.recommendation import Rating

ratings_rdd = transformed.map(lambda r: Rating(r.user_id, r.item_id, r.rating))

在较新版本的 Spark 中,您可以跳过转换,直接使用 ml.recommendation.ALS:

from pyspark.ml.recommendation import ALS

als = (ALS(userCol="user_id", itemCol="item_id", ratingCol="rating")
  .fit(transformed))

关于python - 在 pyspark 中转换 ALS 的输入数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33723099/

相关文章:

python - 在 python 2.7 (windows 8.1) 中安装 OpenSSL 时出现 InsecurePlatformWarning

python - 新的 Dataframe 列作为其他行的通用函数(spark)

pyspark - 基于某些条件在 databricks notebook 中执行 cmd 单元格

python - 如何使用 scala 或 python 在 apache spark 中运行多线程作业?

apache-spark - 如何设置Spark Kmeans初始中心

python - sqlalchemy/sql : cross model relationships, 从 'cousin' 模型获取信息

python - 关于获取数据的错误

python - 有条件合并数据帧行

python - Pyspark - 不确定如何将以下 X 行的总和分配给现有行值

java - 将 JavaRDD 字符串转换为 JavaRDD vector