apache-spark - PySpark - ALS 输出中的 RDD 到 DataFrame

标签 apache-spark pyspark rdd apache-spark-mllib apache-spark-sql

我正在使用 Spark 的推荐系统。

训练模型后,我执行了以下代码以获得推荐 model.recommendProductsForUsers(2)

[(10000, (Rating(user=10000, product=14780773, rating=7.35695469892999e-05), 
          Rating(user=10000, product=17229476, rating=5.648606256948921e-05))), 
 (0, (Rating(user=0, product=16750010, rating=0.04405213492474741), 
      Rating(user=0, product=17416511, rating=0.019491942665715176))), 
 (20000, (Rating(user=20000, product=17433348, rating=0.017938298063142653), 
          Rating(user=20000, product=17333969, rating=0.01505112418739887)))]

在本例中,Rec 是一个 RDD,请参见下文。

>>> type(Rec)
<class 'pyspark.rdd.RDD'>

如何将此信息放入数据框中,例如

 User | Product   | Rating 
1000  |  14780773 | 7.3e-05
1000  |  17229675 | 5.6e-05
(...)     (...)     (...) 
2000  |  17333969 | 0.015     

谢谢您的宝贵时间

最佳答案

为了验证,我使用以下 pyspark 代码来重现您的 RDD:

from pyspark.mllib.recommendation import Rating

Rec = sc.parallelize([(10000, (Rating(user=10000, product=14780773, rating=7.35695469892999e-05), 
                               Rating(user=10000, product=17229476, rating=5.648606256948921e-05))), 
                      (0, (Rating(user=0, product=16750010, rating=0.04405213492474741), 
                           Rating(user=0, product=17416511, rating=0.019491942665715176))), 
                      (20000, (Rating(user=20000, product=17433348, rating=0.017938298063142653), 
                               Rating(user=20000, product=17333969, rating=0.01505112418739887)))])

该 RDD 由键值对组成,每个值由一 strip 有评级元组的记录组成。您需要映射 RDD 以仅保留记录,然后将结果分解为每个推荐都有单独的元组。 flatMap(f) 函数将像这样压缩这两个步骤:

flatRec = Rec.flatMap(lambda p: p[1])

这会产生以下形式的 RDD:

[Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
 Rating(user=10000, product=17229476, rating=5.648606256948921e-05),
 Rating(user=0, product=16750010, rating=0.04405213492474741),
 Rating(user=0, product=17416511, rating=0.019491942665715176),
 Rating(user=20000, product=17433348, rating=0.017938298063142653),
 Rating(user=20000, product=17333969, rating=0.01505112418739887)]

现在所需要做的就是使用createDataFrame函数将其转换为DataFrame。每个评级元组都将转换为一个 DataFrame 行,并且由于项目已标记,因此您无需指定架构。

recDF = sqlContext.createDataFrame(flatRec).show()

这将输出以下内容:

+-----+--------+--------------------+
| user| product|              rating|
+-----+--------+--------------------+
|10000|14780773| 7.35695469892999E-5|
|10000|17229476|5.648606256948921E-5|
|    0|16750010| 0.04405213492474741|
|    0|17416511|0.019491942665715176|
|20000|17433348|0.017938298063142653|
|20000|17333969| 0.01505112418739887|
+-----+--------+--------------------+

关于apache-spark - PySpark - ALS 输出中的 RDD 到 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36267602/

相关文章:

apache-spark - 失败的数据 block 作业如何从原来的位置继续下去?

python - 如何在pyspark中并行写入多个parquet文件?

scala - Spark : Split is not a member of org. apache.spark.sql.Row

azure - 在Azure databricks中,将pyspark数据帧写入eventhub花费的时间太长,因为数据帧中有300万条记录

apache-spark - Apache Spark 中的 forEachAsync 与 forEachPartitionAsync 有什么区别?

apache-spark - Spark 流 : Write dataframe to ElasticSearch

python - Pyspark RDD以不同的方式聚合不同的值字段

python - 获取 Spark RDD 中每个键的最大值

python - pyspark countApprox() 似乎与 count() 没有区别

macos - 在我的 Mac 上,hadoop 3.1.0 找到了 native 库,但 spark 2.3.1 没有