描述:”
如何用平均值填充 price 列中的缺失值,并按 Pyspark 中的 condition 和 model 列对数据进行分组?我的Python代码如下:cars['price'] = np.ceil(cars['price'].fillna(cars.groupby(['condition', 'model' ])['price'] .transform('mean')))
错误:
我在 Pyspark 中尝试不同的代码,但每次都会遇到不同的错误。像这样,代码:cars_new=cars.fillna((cars.groupBy("condition", "model").agg(mean("price"))['avg(price)']))
错误:
ValueError: value should be a float, int, long, string, bool or dict
数据框
最佳答案
不确定您的输入数据是什么样子,但假设我们有一个如下所示的数据框:
+---------+-----+-----+
|condition|model|price|
+---------+-----+-----+
|A |A |1 |
|A |B |2 |
|A |B |2 |
|A |A |1 |
|A |A |null |
|B |A |3 |
|B |A |null |
|B |B |4 |
+---------+-----+-----+
我们希望用平均值填充空值,但超过条件
和模型
。
为此,我们可以定义一个Window
,计算avg
,然后替换null
。
示例:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("test").getOrCreate()
data = [
{"condition": "A", "model": "A", "price": 1},
{"condition": "A", "model": "B", "price": 2},
{"condition": "A", "model": "B", "price": 2},
{"condition": "A", "model": "A", "price": 1},
{"condition": "A", "model": "A", "price": None},
{"condition": "B", "model": "A", "price": 3},
{"condition": "B", "model": "A", "price": None},
{"condition": "B", "model": "B", "price": 4},
]
window = Window.partitionBy(["condition", "model"]).orderBy("condition")
df = spark.createDataFrame(data=data)
df = (
df.withColumn("avg", F.avg("price").over(window))
.withColumn(
"price", F.when(F.col("price").isNull(), F.col("avg")).otherwise(F.col("price"))
)
.drop("avg")
)
这给了我们:
+---------+-----+-----+
|condition|model|price|
+---------+-----+-----+
|A |A |1.0 |
|A |A |1.0 |
|A |A |1.0 |
|B |B |4.0 |
|B |A |3.0 |
|B |A |3.0 |
|A |B |2.0 |
|A |B |2.0 |
+---------+-----+-----+
关于pyspark - 通过对多列进行分组来用平均值填充缺失值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70186217/