apache-spark - Pyspark : forward fill with last observation for a DataFrame

标签 apache-spark pyspark apache-spark-sql spark-dataframe

使用 Spark 1.5.1,

我一直在尝试转发填充 空值 的最后一次已知观察一栏我的数据帧。

可以从一个空值开始,在这种情况下,我会用第一个 knwn 观察向后填充这个空值。但是,如果这使代码过于复杂,则可以跳过这一点。

在此 postzero323 为一个非常相似的问题提供了 Scala 中的解决方案。 .

但是,我不知道 Scala 并且我没有成功地在 Pyspark API 代码中“翻译”它。可以用 Pyspark 做到吗?

谢谢你的帮助。

下面是一个简单的示例示例输入:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

和预期的输出:
| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4

最佳答案

来自 Spark / Scala: forward fill with last observation 的分区示例代码在 pyspark 中显示。这仅适用于可以分区的数据。

加载数据

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

数据框是
+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

用于对分区进行排序的列
# get the sort key
def getKey(item):
    return item.c_date

填充功能。如有必要,可用于填写多列。
# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

转换为rdd、分区、排序和填充缺失值
# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

转换回数据帧
df_out = sqlContext.createDataFrame(rdd)
df_out.show()

输出是
+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+

关于apache-spark - Pyspark : forward fill with last observation for a DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36019847/

相关文章:

python - Pyspark 将标准列表转换为数据框

python - 如何使用 .whl 文件调用 pyspark 代码?

hadoop - 在Spark中充分利用内存

scala - HBASE SPARK 带过滤器的查询,无需加载所有 hbase

python - PySpark 根据列条件删除重复项

apache-spark - 如何获取Spark Streaming处理的记录总数?

sql - 如何使用sql在数据 block 上创建带有嵌套 map 的表

apache-spark - 使用 dataframe.schema VS dataframe.printSchema() 比较 pyspark 模式

join - Spark如何执行join + filter?它是可扩展的吗?

java - Py4JJavaError : An error occurred while calling z:org. apache.spark.api.python.PythonRDD.collectAndServe。 : java. lang.IllegalArgumentException