scala - 在scala中对Spark中执行的dataframe列进行操作时,出现 "Not supported within a window function"的错误

标签 scala apache-spark dataframe

我有以下原始数据,我需要清理它:

 03:35:20.299037 IP 10.0.0.1 > 10.0.0.2: ICMP echo request, id 8321, seq 17, length 64
    03:35:20.327290 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
    03:35:20.330845 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8311, seq 19, length 64
 03:35:20.330892 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8321, seq 17, length 64
    03:35:20.330918 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
    03:35:20.330969 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
  03:35:20.331041 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64

我需要有以下输出:

+---------------+-----------+-------------+----+------+---+----+
|   time_stamp_0|sender_ip_1|receiver_ip_2|rank|count | xi|pi  |
+---------------+-----------+-------------+----+------+---+----+
|03:35:20.299037|   10.0.0.1|     10.0.0.2|   1|     7| 1 |0.14|  
|03:35:20.327290|   10.0.0.1|     10.0.0.3|   1|     7| 4 |0.57|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|   2|     7| 4 |0.57|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|   3|     7| 4 |0.57|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|   4|     7| 4 |0.57|
|03:35:20.330969|   10.0.0.1|     10.0.0.4|   1|     7| 2 |0.28|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|   2|     7| 2 |0.28|

根据上面的数据框,我需要将每个源IP和目标IP的重复总数作为“xi”列,将总行数作为“count”列,将xi/count的除法作为“pi” “柱子。当我想计算 xi 并收到以下错误时,我的问题就会开始:

   Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression 'count#11L' not supported within a window function.;;
Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, pi#90]
+- Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, _we0#109L, (cast(xi#45 as double) / cast(_we0#109L as double)) AS pi#90]
   +- Window [count#11L windowspecdefinition(sender_ip_1#4, receiver_ip_2#5, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#109L], [sender_ip_1#4, receiver_ip_2#5]

    ...

我有以下代码:

      val customSchema = StructType(Array(
      StructField("time_stamp_0", StringType, true),
      StructField("sender_ip_1", StringType, true),
      StructField("receiver_ip_2", StringType, true)))

    ///////////////////////////////////////////////////make train dataframe
    val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/dsetcopy.txt")

    val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {

      val array1 = array(0).trim.split("IP")
      val array2 = array1(1).split(">")
      val array3 = array2(1).split(":")

      val first = Try(array1(0).trim) getOrElse ""
      val second = Try(array2(0).trim) getOrElse ""
      val third = Try(array3(0)) getOrElse ""

      Row.fromSeq(Seq(first, second, third))
    })

    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
    val columns1and2_count = Window.partitionBy("sender_ip_1", "receiver_ip_2")
    val columns1and2_rank = Window.partitionBy("sender_ip_1", "receiver_ip_2").orderBy("time_stamp_0")
    // <-- matches groupBy
    //Add rank() to df
    val Dataframe_add_rank = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2_count).distinct()
    //Add cout to df
    val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
    //Add x(i) to df
    val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", rank() over columns1and2_rank).distinct()// Dataframe.show()
    //Add p(i)=maxrank(x(i))/count
    val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / ($"count").over(columns1and2_count))// Dataframe.show()
  val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
    val stdDevValue = std_dev.head.getDouble(0)
    std_dev.show()
    //Add attack status

    val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))


Dataframe_add_rank_count_xi_pi.show()

更新: 根据答案一,我得到以下输出:

+---------------+-----------+-------------+-----+----+---+---+------+
|   time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|attack|
+---------------+-----------+-------------+-----+----+---+---+------+
|03:35:20.330969|   10.0.0.1|     10.0.0.4|    2|   1|  4|2.0|     0|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|    2|   2|  4|2.0|     1|
|03:35:20.299037|   10.0.0.1|     10.0.0.2|    1|   1|  4|4.0|     0|
|03:35:20.327290|   10.0.0.1|     10.0.0.3|    4|   1|  4|1.0|     0|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|    4|   2|  4|1.0|     1|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|    4|   3|  4|1.0|     1|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|    4|   4|  4|1.0|     1|
+---------------+-----------+-------------+-----+----+---+---+------+

所以 xi 不正确。 :( 你能帮助我吗?提前致谢。

最佳答案

如果您想填充ximax计数。正如你所说

the maximum number of repetition for each source and destination IP as "xi" column

那么你应该这样做

val maxForXI = Dataframe_add_rank_count.agg(max("rank")).first.get(0)
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", lit(maxForXI))

你应该得到

+---------------+-----------+-------------+-----+----+---+
|time_stamp_0   |sender_ip_1|receiver_ip_2|count|rank|xi |
+---------------+-----------+-------------+-----+----+---+
|03:35:20.330969|10.0.0.1   | 10.0.0.4    |2    |1   |4  |
|03:35:20.331041|10.0.0.1   | 10.0.0.4    |2    |2   |4  |
|03:35:20.299037|10.0.0.1   | 10.0.0.2    |1    |1   |4  |
|03:35:20.327290|10.0.0.1   | 10.0.0.3    |4    |1   |4  |
|03:35:20.330845|10.0.0.1   | 10.0.0.3    |4    |2   |4  |
|03:35:20.330892|10.0.0.1   | 10.0.0.3    |4    |3   |4  |
|03:35:20.330918|10.0.0.1   | 10.0.0.3    |4    |4   |4  |
+---------------+-----------+-------------+-----+----+---+

最后一列pi你说

the devision of xi/count as "pi" column

所以你应该这样做

val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")

你应该得到想要的结果

+---------------+-----------+-------------+-----+----+---+---+
|   time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|
+---------------+-----------+-------------+-----+----+---+---+
|03:35:20.330969|   10.0.0.1|     10.0.0.4|    2|   1|  4|2.0|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|    2|   2|  4|2.0|
|03:35:20.299037|   10.0.0.1|     10.0.0.2|    1|   1|  4|4.0|
|03:35:20.327290|   10.0.0.1|     10.0.0.3|    4|   1|  4|1.0|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|    4|   2|  4|1.0|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|    4|   3|  4|1.0|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|    4|   4|  4|1.0|
+---------------+-----------+-------------+-----+----+---+---+

已编辑

由于问题已被编辑,最终编辑后的答案如下,(这些是定义窗口函数后的代码)

//Add rank() to df
val cnt = Frist_Dataframe.count
val Dataframe_add_rank = Frist_Dataframe.withColumn("count", lit(cnt))
//Add cout to df
val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add x(i) to df
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", count($"receiver_ip_2") over columns1and2_count)// Dataframe.show()
//Add p(i)=maxrank(x(i))/count
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")// Dataframe.show()

val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
val stdDevValue = std_dev.head.getDouble(0)
std_dev.show()
//Add attack status

val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))
final_add_count_rank_xi_attack.show()

希望您这次选对了 table 。

关于scala - 在scala中对Spark中执行的dataframe列进行操作时,出现 "Not supported within a window function"的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44909679/

相关文章:

scala - Spark - 错误 : Failed to load class - spark-submit

multithreading - 在一个内核中的Spark worker上启动多个处理器线程

scala - 如何在Apache Spark作业中执行阻止IO?

python - 使用 DataFrame 中的值从 pandas Dataframe 中的函数构建新列

python - Pandas dataframe - 将某些行或单元格格式化为 2 d.p

java - Scala 将很长的字符串解析为日期

Scala Extractor unapply 调用了两次

string - 使用scala在spark中分割由逗号和空格分隔的字符串

scala - 值删除不是 slick.lifted.Query[T,T#TableElementType,Seq] 的成员

python - 将 pandas 数据框列拆分为多个 bool 列