pyspark - 通过对多列进行分组来用平均值填充缺失值

标签 pyspark apache-spark-sql

描述:” 如何用平均值填充 price 列中的缺失值,并按 Pyspark 中的 condition 和 model 列对数据进行分组?我的Python代码如下:cars['price'] = np.ceil(cars['price'].fillna(cars.groupby(['condition', 'model' ])['price'] .transform('mean')))

错误: 我在 Pyspark 中尝试不同的代码,但每次都会遇到不同的错误。像这样,代码:cars_new=cars.fillna((cars.groupBy("condition", "model").agg(mean("price"))['avg(price)'])) 错误:

ValueError: value should be a float, int, long, string, bool or dict

数据框

enter image description here

最佳答案

不确定您的输入数据是什么样子,但假设我们有一个如下所示的数据框:

+---------+-----+-----+                                                         
|condition|model|price|
+---------+-----+-----+
|A        |A    |1    |
|A        |B    |2    |
|A        |B    |2    |
|A        |A    |1    |
|A        |A    |null |
|B        |A    |3    |
|B        |A    |null |
|B        |B    |4    |
+---------+-----+-----+

我们希望用平均值填充空值,但超过条件模型

为此,我们可以定义一个Window,计算avg,然后替换null

示例:

from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("test").getOrCreate()
data = [
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "A", "price": None},
    {"condition": "B", "model": "A", "price": 3},
    {"condition": "B", "model": "A", "price": None},
    {"condition": "B", "model": "B", "price": 4},
]

window = Window.partitionBy(["condition", "model"]).orderBy("condition")
df = spark.createDataFrame(data=data)
df = (
    df.withColumn("avg", F.avg("price").over(window))
    .withColumn(
        "price", F.when(F.col("price").isNull(), F.col("avg")).otherwise(F.col("price"))
    )
    .drop("avg")
)

这给了我们:

+---------+-----+-----+
|condition|model|price|
+---------+-----+-----+
|A        |A    |1.0  |
|A        |A    |1.0  |
|A        |A    |1.0  |
|B        |B    |4.0  |
|B        |A    |3.0  |
|B        |A    |3.0  |
|A        |B    |2.0  |
|A        |B    |2.0  |
+---------+-----+-----+

关于pyspark - 通过对多列进行分组来用平均值填充缺失值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70186217/

相关文章:

apache-spark - 从 Spark 数据框中选择不同值的最有效方法是什么?

apache-spark - 什么是 DataProcSparkOperator?

python - 使用 posexplode 分解带有索引的嵌套 JSON

pyspark - to_timestamp 什么时候产生 19xx 的结果?

apache-spark - Spark RDD 中按行删除重复项

python - Pyspark 将 rdd 转换为带有 null 的数据帧

java - 如何获取变量中的数据帧值

amazon-web-services - 如何通过 Cloudformation 在 EMR 上运行 Spark 作业

scala - 如何根据分配的优先级选择最重要的行?

Scala Spark 过滤掉重复出现的零值