pyspark - 插入缺失的日期行并在新行中插入旧值 PySpark

标签 pyspark

我有一个包含人物、体重和时间戳的 DataFrame:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
+-----------+-------------------+------+

我想填写这样每个人都有每个日期的东西,这意味着上面应该是:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-04 00:00:01| 50.49|
|          1|2019-12-05 00:00:01| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          1|2019-12-07 00:00:01| 50.24|
|          1|2019-12-08 00:00:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-03 00:00:01| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
|          2|2019-12-05 00:00:01| 65.64|
|          2|2019-12-06 00:00:01| 65.64|
|          2|2019-12-07 00:00:01| 65.64|
|          2|2019-12-08 00:00:01| 65.64|
+-----------+-------------------+------+

我已经定义了一个新表,它使用 datediff 来包含最小和最大日期之间的所有日期:

min_max_date = df_person_weights.select(min("timestamp"), max("timestamp")) \
        .withColumnRenamed("min(timestamp)", "min_date") \
        .withColumnRenamed("max(timestamp)", "max_date")

min_max_date = min_max_date.withColumn("datediff", datediff("max_date", "min_date")) \
        .withColumn("repeat", expr("split(repeat(',', datediff), ',')")) \
        .select("*", posexplode("repeat").alias("date", "val")) \
        .withColumn("date", expr("date_add(min_date, date)"))

这给了我一个新的 DataFrame,其中包含如下日期:

+----------+
|      date|
+----------+
|2019-12-03|    
|2019-12-03|
|2019-12-04|
|2019-12-05|
|2019-12-06|
|2019-12-07|
|2019-12-08|
+----------+

我尝试过不同的连接方式,例如:

min_max_date.join(df_price_history, min_max_date.date != df_price_history.date, "leftouter")

但是我没有得到我需要的结果,有人可以帮忙吗?如何合并我现在拥有的信息?

最佳答案

您正在寻找前向填充数据集。这变得有点复杂,因为您需要按类别(人)进行操作。

一种方法是这样的:创建一个新的 DataFrame,其中包含您想要为其赋值的每个人的所有日期(见下文,这只是 dates_by_person)。

然后,将原始 DataFrame 左连接到这个 DataFrame,这样您就可以开始创建缺失的行。

接下来,使用窗口函数在每组 person 中查找,按日期排序,最后一个非空权重。如果您每个日期可以有多个条目(因此一个人在一个特定日期有多个填写的记录),您还必须按时间戳列排序。

最后合并列,以便任何空字段都被预期值替换。

from datetime import datetime, timedelta
from itertools import product

import pyspark.sql.functions as psf
from pyspark.sql import Window

data = (  # recreate the DataFrame
    (1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
    (1, datetime(2019, 12, 3, 8, 58, 39), 50.49),
    (1, datetime(2019, 12, 6, 10, 44, 1), 50.24),
    (2, datetime(2019, 12, 2, 8, 58, 39), 62.32),
    (2, datetime(2019, 12, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

min_max_timestamps = df.agg(psf.min(df.timestamp), psf.max(df.timestamp)).head()
first_date, last_date = [ts.date() for ts in min_max_timestamps]
all_days_in_range = [first_date + timedelta(days=d)
                     for d in range((last_date - first_date).days + 1)]
people = [row.person for row in df.select("person").distinct().collect()]
dates_by_person = spark.createDataFrame(product(people, all_days_in_range),
                                        schema=("person", "date"))

df2 = (dates_by_person.join(df,
                            (psf.to_date(df.timestamp) == dates_by_person.date)
                            & (dates_by_person.person == df.person),
                            how="left")
       .drop(df.person)
       )
wind = (Window
        .partitionBy("person")
        .rangeBetween(Window.unboundedPreceding, -1)
        .orderBy(psf.unix_timestamp("date"))
        )
df3 = df2.withColumn("last_weight",
                     psf.last("weight", ignorenulls=True).over(wind))
df4 = df3.select(
    df3.person,
    psf.coalesce(df3.timestamp, psf.to_timestamp(df3.date)).alias("timestamp"),
    psf.coalesce(df3.weight, df3.last_weight).alias("weight"))
df4.show()
# +------+-------------------+------+
# |person|          timestamp|weight|
# +------+-------------------+------+
# |     1|2019-12-02 14:54:17| 49.94|
# |     1|2019-12-03 08:58:39| 50.49|
# |     1|2019-12-04 00:00:00| 50.49|
# |     1|2019-12-05 00:00:00| 50.49|
# |     1|2019-12-06 10:44:01| 50.24|
# |     2|2019-12-02 08:58:39| 62.32|
# |     2|2019-12-03 00:00:00| 62.32|
# |     2|2019-12-04 10:44:01| 65.64|
# |     2|2019-12-05 00:00:00| 65.64|
# |     2|2019-12-06 00:00:00| 65.64|
# +------+-------------------+------+

编辑:正如 David 在评论中所建议的那样,如果你有很多人,dates_by_people 的构造可以通过不需要将所有内容都交给驱动程序的方式来完成.在这个例子中,我们讨论的是少量整数,没什么大不了的。但如果它变大,请尝试:

dates = spark.createDataFrame(((d,) for d in all_days_in_range),
                              schema=("date",))
people = df.select("person").distinct()
dates_by_person = dates.crossJoin(people)

关于pyspark - 插入缺失的日期行并在新行中插入旧值 PySpark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59240277/

相关文章:

python - 带 Spark 的 iPython 笔记本在 SparkContext 中出现错误

apache-spark - Pyspark - FileInputDStream : Error finding new files

python - Pyspark 依赖任何 ID 的任何滑动窗口

python - 返回 pyspark GroupedData 中具有最佳字段的行

apache-spark - 将数据帧转换为 libsvm 格式

json - pyspark 将新的嵌套数组添加到现有的 json 文件中

sql - Pyspark Dataframes 作为 View

python - 使用 PySpark UDF 时记录工作人员 ID

PySpark-将 map 功能添加为列

python - PySpark 在嵌套数组中反转 StringIndexer