使用 Spark 1.5.1,
我一直在尝试转发填充 空值 与 的最后一次已知观察一栏我的数据帧。
可以从一个空值开始,在这种情况下,我会用第一个 knwn 观察向后填充这个空值。但是,如果这使代码过于复杂,则可以跳过这一点。
在此 post ,zero323 为一个非常相似的问题提供了 Scala 中的解决方案。 .
但是,我不知道 Scala 并且我没有成功地在 Pyspark API 代码中“翻译”它。可以用 Pyspark 做到吗?
谢谢你的帮助。
下面是一个简单的示例示例输入:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | null
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | null
| 1 | 2015-12-05 | null
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | null
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | null
| 2 | 2015-12-03 | null
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | null
| 2 | 2015-12-06 | U4
和预期的输出:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | U1
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | U1
| 1 | 2015-12-05 | U1
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | U2
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | U1
| 2 | 2015-12-03 | U3
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | U3
| 2 | 2015-12-06 | U4
最佳答案
来自 Spark / Scala: forward fill with last observation 的分区示例代码在 pyspark 中显示。这仅适用于可以分区的数据。
加载数据
values = [
(1, "2015-12-01", None),
(1, "2015-12-02", "U1"),
(1, "2015-12-02", "U1"),
(1, "2015-12-03", "U2"),
(1, "2015-12-04", None),
(1, "2015-12-05", None),
(2, "2015-12-04", None),
(2, "2015-12-03", None),
(2, "2015-12-02", "U3"),
(2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()
数据框是
+---------+----------+-------+
|cookie_id| c_date|user_id|
+---------+----------+-------+
| 1|2015-12-01| null|
| 1|2015-12-02| U1|
| 1|2015-12-02| U1|
| 1|2015-12-03| U2|
| 1|2015-12-04| null|
| 1|2015-12-05| null|
| 2|2015-12-04| null|
| 2|2015-12-03| null|
| 2|2015-12-02| U3|
| 2|2015-12-05| null|
+---------+----------+-------+
用于对分区进行排序的列
# get the sort key
def getKey(item):
return item.c_date
填充功能。如有必要,可用于填写多列。
# fill function
def fill(x):
out = []
last_val = None
for v in x:
if v["user_id"] is None:
data = [v["cookie_id"], v["c_date"], last_val]
else:
data = [v["cookie_id"], v["c_date"], v["user_id"]]
last_val = v["user_id"]
out.append(data)
return out
转换为rdd、分区、排序和填充缺失值
# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])
转换回数据帧
df_out = sqlContext.createDataFrame(rdd)
df_out.show()
输出是
+---+----------+----+
| _1| _2| _3|
+---+----------+----+
| 1|2015-12-01|null|
| 1|2015-12-02| U1|
| 1|2015-12-02| U1|
| 1|2015-12-03| U2|
| 1|2015-12-04| U2|
| 1|2015-12-05| U2|
| 2|2015-12-02| U3|
| 2|2015-12-03| U3|
| 2|2015-12-04| U3|
| 2|2015-12-05| U3|
+---+----------+----+
关于apache-spark - Pyspark : forward fill with last observation for a DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36019847/