通过示例可能更容易解释我的场景。假设我有以下数据:
输入时间
一个 1
乙 3
一个 5
B 9
我想在每一行中添加一个额外的列,表示相同类型的所有列之间的最小绝对值差异。因此,对于第一行,类型 A 的所有时间之间的最小差值为 4,因此第 1 列和第 3 列的值为 4,同样,第 2 列和第 4 列的值为 6。
我在 Spark 和 Spark SQL 中执行此操作,因此那里的指导会更有用,但如果需要通过普通 SQL 进行解释,那也会有很大帮助。
最佳答案
一种可能的方法是使用窗口函数。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, abs}
val df = Seq(
("A", -10), ("A", 1), ("A", 5), ("B", 3), ("B", 9)
).toDF("type", "time")
首先让我们确定按时间排序的连续行之间的差异:
// Partition by type and sort by time
val w1 = Window.partitionBy($"Type").orderBy($"Time")
// Difference between this and previous
val diff = $"time" - lag($"time", 1).over(w1)
然后找到给定类型的所有差异的最小值:
// Partition by time unordered and take unbounded window
val w2 = Window.partitionBy($"Type").rowsBetween(Long.MinValue, Long.MaxValue)
// Minimum difference over type
val minDiff = min(diff).over(w2)
df.withColumn("min_diff", minDiff).show
// +----+----+--------+
// |type|time|min_diff|
// +----+----+--------+
// | A| -10| 4|
// | A| 1| 4|
// | A| 5| 4|
// | B| 3| 6|
// | B| 9| 6|
// +----+----+--------+
如果您的目标是找到当前行与组中任何其他行之间的最小距离,您可以使用类似的方法
import org.apache.spark.sql.functions.{lead, when}
// Diff to previous
val diff_lag = $"time" - lag($"time", 1).over(w1)
// Diff to next
val diff_lead = lead($"time", 1).over(w1) - $"time"
val diffToClosest = when(
diff_lag < diff_lead || diff_lead.isNull,
diff_lag
).otherwise(diff_lead)
df.withColumn("diff_to_closest", diffToClosest)
// +----+----+---------------+
// |type|time|diff_to_closest|
// +----+----+---------------+
// | A| -10| 11|
// | A| 1| 4|
// | A| 5| 4|
// | B| 3| 6|
// | B| 9| 6|
// +----+----+---------------+
关于sql - 添加一个额外的列,表示前一列的最接近差异之间的差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37624090/