pyspark - 转发填充新行以弥补缺失日期

标签 pyspark apache-spark-sql

我目前有一个数据集,由变量“聚合器”按小时增量分组。这个每小时数据中存在间隙,我理想的情况是用映射到 x 列中变量的前一行向前填充行。

我已经看到一些使用 PANDAS 解决类似问题的解决方案,但理想情况下我想了解如何最好地使用 pyspark UDF 来解决这个问题。

我最初考虑过使用 PANDAS 进行以下操作,但也很难实现这一点,以忽略聚合器作为第一遍:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

但理想情况下我想避免使用 PANDAS。

在下面的示例中,我有两行缺失的每小时数据(标记为 MISSING)。

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

这里的预期输出如下:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

感谢您的帮助。

谢谢。

最佳答案

这是填补缺失时间的解决方案。使用 windows、lag 和 udf。只需稍作修改,它也可以延长数天。

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
       .filter(col("prev_timestamp").isNotNull())\
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

结果

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+

关于pyspark - 转发填充新行以弥补缺失日期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55361378/

相关文章:

java - Apache Spark + Java : "java.lang.AssertionError: assertion failed" in ExpressionEncoder

json - Spark SQL DataFrame pretty-print

python - 从 Spark 1.0 开始,不推荐通过 'pyspark' 运行 python 应用程序

pandas - 从格式为 d-m-yyyy(Pyspark 或 Pandans)的字符串中创建 Unix 时间戳

apache-spark - pyspark。数据框中的 zip 数组

Scala Spark - 引用另一个数据帧的映射函数

apache-spark - 从 pyspark 数据帧中减去平均值

python - 值错误: Length of object (3) does not match with length of fields

python - 协同过滤中的多重特征——spark

apache-spark - DataFrameReader 在读取 avro 文件时抛出 "Unsupported type NULL"