python - Spark DataFrame 运算符(唯一、乘法)

标签 python apache-spark pyspark spark-dataframe

我正在使用带有 Pandas 的 jupyter notebook,但是当我使用 Spark 时,我想使用 Spark DataFrame 来转换或计算而不是 Pandas。请帮我将一些计算转换为 Spark DataFrame 或 RDD。

数据框:

df =
+--------+-------+---------+--------+
| userId | item  |  price  |  value |
+--------+-------+---------+--------+
|  169   | I0111 |  5300   |   1    |
|  169   | I0973 |  70     |   1    |
|  336   | C0174 |  455    |   1    |
|  336   | I0025 |  126    |   1    |
|  336   | I0973 |   4     |   1    |
| 770963 | B0166 |   2     |   1    |
| 1294537| I0110 |  90     |   1    |
+--------+-------+---------+--------+

1.使用 Pandas 计算:
(1)  userItem = df.groupby(['userId'])['item'].nunique()

结果是一个系列对象:
+--------+------+
| userId |      |
+--------+------+
|  169   |   2  |
|  336   |   3  |
| 770963 |   1  |
| 1294537|   1  |
+--------+------+

2. 使用乘法
data_sum = df.groupby(['userId', 'item'])['value'].sum()  --> result is Series object

average_played = np.mean(userItem)  --> result is number

(2)  weighted_games_played = data_sum * (average_played / userItem)

请帮助我在 Spark 上使用 Spark DataFrame 和 Operators 来执行此操作 (1) 和 (2)

最佳答案

您可以使用以下内容实现(1):

import pyspark.sql.functions as f
userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item'))

对于(2):
data_sum=df.groupby(['userId','item']).agg(f.sum('value').alias('sum_value'))

average_played=userItem.agg(f.mean('n_item').alias('avg_played'))

data_sum=data_sum.join(userItem, on='userId').crossJoin(average_played)

data_sum=data_sum.withColumn("weighted_games_played", f.expr("sum_value*avg_played/n_item"))

关于python - Spark DataFrame 运算符(唯一、乘法),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46442415/

相关文章:

apache-spark - Spark 到 MySQL 的 JDBC 写入速度低

apache-spark - spark "basePath"选项设置

python - 如何像我们在 pyspark withColumn 中那样在 pandas 中动态创建新列

python - 如何从pyspark中的特定目录加载模块

hadoop - map 转换性能 spark dataframe 与 RDD

python - 沿坐标列表给定的路径矢量化 haversine 距离计算

python - Flask session 不持久

apache-spark - AWS EMR Spark : Error: Cannot load main class from JAR

Python - Sympy 最小值和最大值

python - RPY2:进口商因 .Renviron 失败