c# - 前向填充 .NET for Spark

标签 c# dataframe apache-spark window-functions .net-spark

我正在查看 .NET (C#) 中 Spark DataFrame 的窗口函数。

我有一个包含年、月、日、小时、分钟、ID、类型和值列的 DataFrame df:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  11 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  12 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

我想根据年、月、日、小时、分钟用上一行的值填充空行 (null),如下所示:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  11 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  12 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

到目前为止,我找到了在 scala 中使用 Windows 和 Lag 函数的解决方案,但我不确定如何在 C# 中执行此操作。在 scala 中,窗口将被定义为:

val window = Window.orderBy("年", "月", "日", "时", "分")

我想使用

添加一个 newValue 列

var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window) ).否则(df["值"])

如何在 .NET 中为 Spark 定义一个窗口并使用 Lag 函数来前向填充空值?

最佳答案

要在 .NET for Apache Spark 中使用 Lag 和 Window,您已经非常接近并且需要:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
    new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
    new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
    new StructField("Year", new IntegerType()),
    new StructField("Month", new IntegerType()),
    new StructField("Day", new IntegerType()),
    new StructField("Hour", new IntegerType()),
    new StructField("Minute", new IntegerType()),
    new StructField("ID", new IntegerType()),
    new StructField("Type", new StringType()),
    new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
    Functions.When(df["Value"].IsNull(),
            Functions.Lag(df["Value"], 1).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

这会导致:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|    null|
|2021|    3|  4|   8|    12|null| null| null|    null|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

但您可能需要 Last 而不是 Lag,因为您可以跳过空值:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
    new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
    new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
    new StructField("Year", new IntegerType()),
    new StructField("Month", new IntegerType()),
    new StructField("Day", new IntegerType()),
    new StructField("Hour", new IntegerType()),
    new StructField("Minute", new IntegerType()),
    new StructField("ID", new IntegerType()),
    new StructField("Type", new StringType()),
    new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
    Functions.When(df["Value"].IsNull(),
        Functions.Last(df["Value"], true).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

结果是:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|   380.5|
|2021|    3|  4|   8|    12|null| null| null|   380.5|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

希望对您有所帮助!

编辑

(完成这项工作所需的 using 语句)

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

关于c# - 前向填充 .NET for Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66764291/

相关文章:

java - 如何立即返回 String Builder 对象作为 rdd ?或者将 String Builder 对象转换为 rdd?

apache-spark - Spark : can you include partition columns in output files?

apache-spark - 将文件保存到 Parquet 时,分区列移动到行尾

在 R 中重新排列数据框中的列

list - 逐行创建 R 数据框

c# - asp.net MVC4 HtmlHelper 扩展和渲染函数

c# - ASP.Net Core MVC 从 Select 控件的 View 访问 ViewModel

python - 如何获得数据框中低于特定阈值的最小值?

c# - WebApi 创建无法从 Xamarin.Android 运行的用户

c# - MailMessage 忽略我的换行符