我有一个数据字典(键代表项目(1,2,3..是项目的ID),它们的值('712907','742068')指用户)。我将其转换为 pandas 数据框:
data_dict = {0: ['712907','742068','326136','667386'],
1: ['667386','742068','742068'],
2: ['326136', '663056', '742068','742068'],
3: ['326136', '663056', '742068'],4: ['326116','742068','663056', '742068'],5: ['326136','326136','663056', '742068']}
df= pd.DataFrame.from_dict(data_dict, orient='index')
我根据用户(“712907”、“742068”、“326136”..)对数据框中的项目进行分组,请参见下图。
dframe = pd.get_dummies(df.stack()).sum(level=0)
sv = sparse.csr_matrix(dframe.as_matrix())
请注意,上面的数据帧(dframe)只是一个小例子,实际的dframe大小是(309235 x 81566)。因此,我想使用spark来计算sv(稀疏矩阵)中行(1,2,3...)之间的余弦相似度。 到目前为止我所取得的成就是:
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
sc = pyspark.SparkContext(appName="cosinesim")
sqlContext = SQLContext(sc)
sv_rdd = sc.parallelize(sv.toarray())
使用example ,我将 rdd 转换为数据帧:
def f(x):
d = {}
for i in range(len(x)):
d[str(i)] = int(x[i])
return d
dfspark = sv_rdd.map(lambda x: Row(**f(x))).toDF()
关注此example ,我添加一个新的“id”列:
row_with_index = Row(*["id"] + dfspark.columns)
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(dfspark.columns)
dfidx = (dfspark.rdd
.zipWithIndex()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + dfspark.schema.fields)))
最后,通过转置矩阵来计算行之间的相似度:
pred = IndexedRowMatrix(dfidx.rdd.map(lambda row: IndexedRow(row.id,row[1:])))
pred1 = pred.toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred1.columnSimilarities()
如何根据余弦相似度(pred_sims)获取每个项目 0,1,2,3,4 的 top-k id? 我将 CoordinateMatrix 转换为数据框,但不确定如何访问每个 id 的前 k 个项目..
columns = ['from', 'to', 'sim']
vals = pred_sims.entries.map(lambda e: (e.i, e.j, e.value)).collect()
dfsim = sqlContext.createDataFrame(vals, columns)
dfsim.show()
from pyspark.sql.functions import col, desc
for i in range(m):
target_id = int(dataset_u[i])
dfFrom = dfsim.where((col("from") == target_id))
.....
最佳答案
您可以使用窗口函数按每个项目的相似度进行排序,然后使用 row_count():
from pyspark.sql.window import Window
window = Window.partitionBy(dfsim['from']).orderBy(dfsim['sim'].desc())
dfsim.select('*', func.row_number().over(window).alias('row_number')) \
.filter(func.col('row_number') <= 3) \
.show()
+----+---+------------------+----------+
|from| to| sim|row_number|
+----+---+------------------+----------+
| 0| 1|0.6708203932499369| 1|
| 0| 5|0.6123724356957946| 2|
| 4| 5|0.5000000000000001| 1|
| 1| 4|0.7302967433402215| 1|
| 1| 2|0.7302967433402215| 2|
| 2| 3|0.9428090415820636| 1|
| 2| 4|0.8333333333333336| 2|
| 3| 5|0.9428090415820636| 1|
| 3| 4|0.7071067811865477| 2|
+----+---+------------------+----------+
如果您需要将行选择与原始数据框相关联,请加入原始数据。
关于python - PySpark - 如何根据坐标矩阵中表示的相似性获取 top-k id?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48147185/