sql - 添加一个额外的列,表示前一列的最接近差异之间的差异

标签 sql apache-spark apache-spark-sql

通过示例可能更容易解释我的场景。假设我有以下数据:

输入时间 一个 1 乙 3 一个 5 B 9

我想在每一行中添加一个额外的列,表示相同类型的所有列之间的最小绝对值差异。因此,对于第一行,类型 A 的所有时间之间的最小差值为 4,因此第 1 列和第 3 列的值为 4,同样,第 2 列和第 4 列的值为 6。

我在 Spark 和 Spark SQL 中执行此操作,因此那里的指导会更有用,但如果需要通过普通 SQL 进行解释,那也会有很大帮助。

最佳答案

一种可能的方法是使用窗口函数。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, abs}

val df = Seq(
  ("A", -10), ("A", 1), ("A", 5), ("B", 3), ("B", 9)
).toDF("type", "time")

首先让我们确定按时间排序的连续行之间的差异:

// Partition by type and sort by time
val w1 = Window.partitionBy($"Type").orderBy($"Time")

// Difference between this and previous
val diff = $"time" - lag($"time", 1).over(w1)

然后找到给定类型的所有差异的最小值:

// Partition by time unordered and take unbounded window
val w2 = Window.partitionBy($"Type").rowsBetween(Long.MinValue, Long.MaxValue)

// Minimum difference over type
val minDiff = min(diff).over(w2)

df.withColumn("min_diff",  minDiff).show


// +----+----+--------+
// |type|time|min_diff|
// +----+----+--------+
// |   A| -10|       4|
// |   A|   1|       4|
// |   A|   5|       4|
// |   B|   3|       6|
// |   B|   9|       6|
// +----+----+--------+

如果您的目标是找到当前行与组中任何其他行之间的最小距离,您可以使用类似的方法

import org.apache.spark.sql.functions.{lead, when}

// Diff to previous
val diff_lag = $"time" - lag($"time", 1).over(w1)

// Diff to next
val diff_lead = lead($"time", 1).over(w1) - $"time"

val diffToClosest = when(
  diff_lag < diff_lead || diff_lead.isNull, 
  diff_lag
).otherwise(diff_lead)

df.withColumn("diff_to_closest", diffToClosest)

// +----+----+---------------+
// |type|time|diff_to_closest|
// +----+----+---------------+
// |   A| -10|             11|
// |   A|   1|              4|
// |   A|   5|              4|
// |   B|   3|              6|
// |   B|   9|              6|
// +----+----+---------------+

关于sql - 添加一个额外的列,表示前一列的最接近差异之间的差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37624090/

相关文章:

php - 如何在不删除数据库本身的情况下删除数据库中的所有表?

scala - 如何在Spark Scala中对具有5个元素的元组的RDD进行排序?

scala - Spark 标度 : select column name from other dataframe

scala - 使用hadoop和spark在Azure上使用WordCount

java - 如何在 Java/Kotlin 中创建返回复杂类型的 Spark UDF?

arrays - 使用一系列数字范围创建新列

apache-spark - 在 Spark 中合并等分区数据帧

python - 错误 : sqlite3. 操作错误 : no such table: main. m

SQL Group By 对创建的列进行分组?

java - SQL语句不加单引号( ' ')