我有一个关于连接日志的 DataFrame,其中包含 Id
、targetIP
、Time
列。此 DataFrame 中的每条记录都是一个系统的连接事件。 id表示本次连接,targetIP
表示本次连接的目标IP地址,Time为连接时间。具有值:
我想在某些条件下创建一个新列:过去2个时间单位内到本次目标IP地址的连接数。所以结果 DataFrame 应该是:
例如ID=7
,targetIP
为192.163.0.2
在过去的2个时间单位连接到系统,分别是ID=5
和ID=6
,它们的targetIP
也是192.163.0.2
。所以 ID=7
的计数是 2。
期待您的帮助。
最佳答案
所以,您基本上需要的是一个窗口函数。
让我们从您的初始数据开始
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class Event(ID: Int, Time: Int, targetIP: String)
val events = Seq(
Event(1, 1, "192.163.0.1"),
Event(2, 2, "192.163.0.2"),
Event(3, 3, "192.163.0.1"),
Event(4, 5, "192.163.0.1"),
Event(5, 6, "192.163.0.2"),
Event(6, 7, "192.163.0.2"),
Event(7, 8, "192.163.0.2")
).toDS()
现在我们需要自己定义一个窗口函数
val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)
现在是最有趣的部分:如何计算窗口上的东西?没有简单的方法,所以我们将执行以下操作
- 将所有 targetIp 聚合到列表中
- 过滤列表以仅查找需要的 ips
- 计算列表的大小
val df = events
.withColumn("tmp", collect_list($"targetIp").over(timeWindow))
.withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
.drop($"tmp")
结果将包含我们需要的新列“计数”!
更新:
有一个没有聚合的更短的版本,由@blackbishop 编写,
val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
.withColumn("count", count("*").over(timeWindow) - lit(1))
.explain(true)
关于dataframe - 如何在某些情况下在 Spark DataFrame 中创建新列 'count',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66168969/