dataframe - 如何在某些情况下在 Spark DataFrame 中创建新列 'count'

标签 dataframe apache-spark pyspark apache-spark-sql pyspark-dataframes

我有一个关于连接日志的 DataFrame,其中包含 IdtargetIPTime 列。此 DataFrame 中的每条记录都是一个系统的连接事件。 id表示本次连接,targetIP表示本次连接的目标IP地址,Time为连接时间。具有值:

<表类="s-表"> <头> ID 时间 目标IP <正文> 1 1 192.163.0.1 2 2 192.163.0.2 3 3 192.163.0.1 4 5 192.163.0.1 5 6 192.163.0.2 6 7 192.163.0.2 7 8 192.163.0.2

我想在某些条件下创建一个新列:过去2个时间单位内到本次目标IP地址的连接数。所以结果 DataFrame 应该是:

<表类="s-表"> <头> ID 时间 目标IP 计数 <正文> 1 1 192.163.0.1 0 2 2 192.163.0.2 0 3 3 192.163.0.1 1 4 5 192.163.0.1 1 5 6 192.163.0.2 0 6 7 192.163.0.2 1 7 8 192.163.0.2 2

例如ID=7targetIP192.163.0.2 在过去的2个时间单位连接到系统,分别是ID=5ID=6,它们的targetIP也是192.163.0.2。所以 ID=7 的计数是 2。

期待您的帮助。

最佳答案

所以,您基本上需要的是一个窗口函数。

让我们从您的初始数据开始

import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class Event(ID: Int, Time: Int, targetIP: String)

val events = Seq(
    Event(1, 1, "192.163.0.1"),
    Event(2, 2, "192.163.0.2"),
    Event(3, 3, "192.163.0.1"),
    Event(4, 5, "192.163.0.1"),
    Event(5, 6, "192.163.0.2"),
    Event(6, 7, "192.163.0.2"),
    Event(7, 8, "192.163.0.2")
).toDS()

现在我们需要自己定义一个窗口函数

val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)

现在是最有趣的部分:如何计算窗口上的东西?没有简单的方法,所以我们将执行以下操作

  1. 将所有 targetIp 聚合到列表中
  2. 过滤列表以仅查找需要的 ips
  3. 计算列表的大小
val df = events
        .withColumn("tmp", collect_list($"targetIp").over(timeWindow))
        .withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
        .drop($"tmp")

结果将包含我们需要的新列“计数”!

更新:

有一个没有聚合的更短的版本,由@blackbishop 编写,

val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
        .withColumn("count", count("*").over(timeWindow) - lit(1))
        .explain(true)

关于dataframe - 如何在某些情况下在 Spark DataFrame 中创建新列 'count',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66168969/

相关文章:

python - 如何在 python 中的 SparkSession 上启用 ssl

r - 在 R 中的 data.frame 中选择 n 个最新(按日期)条目的优雅方法是什么?

scala - 如何在 Spark SQL 中使用 CROSS JOIN 和 CROSS APPLY

hadoop - 使用$ HIVE METASTORE JARS指定指向正确的配置单元jar的有效路径,或将spark.sql.hive.metastore.version更改为1.2.1。

PySpark 数据帧 : working with duplicated column names after self join

python - 在 Spark 数据框中拆分列

Python pandas - 将 lambda 添加到每一列

python - 从多个 OHLCV 数据帧创建单个 pandas 数据帧

scala - Apache 星火 : get elements of Row by name

apache-spark - 从 Spark 数据框中获取特定行