hadoop - 在 Spark/Python 中前向填充缺失值

标签 hadoop apache-spark pyspark spark-dataframe apache-spark-mllib

我正在尝试用之前的非空值(如果存在)填充我的 Spark 数据框中的缺失值。我在 Python/Pandas 中做过这种事情,但我的数据对于 Pandas(在一个小集群上)来说太大了,而且我是 Spark 菜鸟。这是 Spark 可以做的事情吗?它可以为多列做吗?如果是这样,如何?如果没有,对于 who Hadoop 工具套件中的替代方法有什么建议吗?

谢谢!

最佳答案

我找到了一种解决方案,无需额外编码即可使用 Window here .所以Jeff是的,有解决办法。完整代码 boelow,我将简要解释它的作用,有关更多详细信息,请查看博客。

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill 
spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)

所以我们的想法是定义一个窗口滑动(更多关于滑动窗口 here )通过始终包含实际行和所有先前行的数据:

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

请注意,我们按时间排序,因此数据的顺序是正确的。另请注意,使用“-sys.maxsize”可确保窗口始终包含所有以前的数据,并在自上而下遍历数据时不断增长,但可能有更有效的解决方案。

使用“last”函数,我们总是在该窗口中处理最后一行。通过传递“ignorenulls=True”,我们定义如果当前行为空,则该函数将返回窗口中最近(最后)的非空值。否则使用实际行的值。

完成。

关于hadoop - 在 Spark/Python 中前向填充缺失值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38131982/

相关文章:

eclipse - Hadoop eclipse插件停止工作

apache-spark - Spark Scala - 如何对数据帧行进行分组并将复杂函数应用于组?

amazon-ec2 - 使用spark-submit向EC2集群提交申请

apache-spark - 如何在具有不同内存和核心数量的集群上调整 spark 作业

hadoop - 退出代码和退出状态是否意味着 Spark ?

installation - 如何在本地主机(mac)上安装Mahout(Hadoop)

java - 将 wget 与 Hadoop 一起使用?

eclipse - 在hadoop集群上运行时,不会调用configure(),但可以在Eclipse上调用DistributedCache FIleNotFoundException

hadoop - 为什么 Apache Spark worker executor 以退出状态 1 被杀死?

apache-spark - Web UI 如何计算存储内存(在 Executors 选项卡中)?