我有以下原始数据,我需要清理它:
03:35:20.299037 IP 10.0.0.1 > 10.0.0.2: ICMP echo request, id 8321, seq 17, length 64
03:35:20.327290 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
03:35:20.330845 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8311, seq 19, length 64
03:35:20.330892 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8321, seq 17, length 64
03:35:20.330918 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
03:35:20.330969 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
03:35:20.331041 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
我需要有以下输出:
+---------------+-----------+-------------+----+------+---+----+
| time_stamp_0|sender_ip_1|receiver_ip_2|rank|count | xi|pi |
+---------------+-----------+-------------+----+------+---+----+
|03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 7| 1 |0.14|
|03:35:20.327290| 10.0.0.1| 10.0.0.3| 1| 7| 4 |0.57|
|03:35:20.330845| 10.0.0.1| 10.0.0.3| 2| 7| 4 |0.57|
|03:35:20.330892| 10.0.0.1| 10.0.0.3| 3| 7| 4 |0.57|
|03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 7| 4 |0.57|
|03:35:20.330969| 10.0.0.1| 10.0.0.4| 1| 7| 2 |0.28|
|03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 7| 2 |0.28|
根据上面的数据框,我需要将每个源IP和目标IP的重复总数作为“xi”列,将总行数作为“count”列,将xi/count的除法作为“pi” “柱子。当我想计算 xi 并收到以下错误时,我的问题就会开始:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression 'count#11L' not supported within a window function.;;
Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, pi#90]
+- Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, _we0#109L, (cast(xi#45 as double) / cast(_we0#109L as double)) AS pi#90]
+- Window [count#11L windowspecdefinition(sender_ip_1#4, receiver_ip_2#5, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#109L], [sender_ip_1#4, receiver_ip_2#5]
...
我有以下代码:
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/dsetcopy.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
Row.fromSeq(Seq(first, second, third))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
val columns1and2_count = Window.partitionBy("sender_ip_1", "receiver_ip_2")
val columns1and2_rank = Window.partitionBy("sender_ip_1", "receiver_ip_2").orderBy("time_stamp_0")
// <-- matches groupBy
//Add rank() to df
val Dataframe_add_rank = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2_count).distinct()
//Add cout to df
val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add x(i) to df
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add p(i)=maxrank(x(i))/count
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / ($"count").over(columns1and2_count))// Dataframe.show()
val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
val stdDevValue = std_dev.head.getDouble(0)
std_dev.show()
//Add attack status
val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))
Dataframe_add_rank_count_xi_pi.show()
更新: 根据答案一,我得到以下输出:
+---------------+-----------+-------------+-----+----+---+---+------+
| time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|attack|
+---------------+-----------+-------------+-----+----+---+---+------+
|03:35:20.330969| 10.0.0.1| 10.0.0.4| 2| 1| 4|2.0| 0|
|03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 2| 4|2.0| 1|
|03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 1| 4|4.0| 0|
|03:35:20.327290| 10.0.0.1| 10.0.0.3| 4| 1| 4|1.0| 0|
|03:35:20.330845| 10.0.0.1| 10.0.0.3| 4| 2| 4|1.0| 1|
|03:35:20.330892| 10.0.0.1| 10.0.0.3| 4| 3| 4|1.0| 1|
|03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 4| 4|1.0| 1|
+---------------+-----------+-------------+-----+----+---+---+------+
所以 xi 不正确。 :( 你能帮助我吗?提前致谢。
最佳答案
如果您想填充xi
列 max
计数。正如你所说
block 引用>the maximum number of repetition for each source and destination IP as "xi" column
那么你应该这样做
val maxForXI = Dataframe_add_rank_count.agg(max("rank")).first.get(0) val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", lit(maxForXI))
你应该得到
+---------------+-----------+-------------+-----+----+---+ |time_stamp_0 |sender_ip_1|receiver_ip_2|count|rank|xi | +---------------+-----------+-------------+-----+----+---+ |03:35:20.330969|10.0.0.1 | 10.0.0.4 |2 |1 |4 | |03:35:20.331041|10.0.0.1 | 10.0.0.4 |2 |2 |4 | |03:35:20.299037|10.0.0.1 | 10.0.0.2 |1 |1 |4 | |03:35:20.327290|10.0.0.1 | 10.0.0.3 |4 |1 |4 | |03:35:20.330845|10.0.0.1 | 10.0.0.3 |4 |2 |4 | |03:35:20.330892|10.0.0.1 | 10.0.0.3 |4 |3 |4 | |03:35:20.330918|10.0.0.1 | 10.0.0.3 |4 |4 |4 | +---------------+-----------+-------------+-----+----+---+
最后一列
pi
你说block 引用>the devision of xi/count as "pi" column
所以你应该这样做
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")
你应该得到想要的结果
+---------------+-----------+-------------+-----+----+---+---+ | time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi| +---------------+-----------+-------------+-----+----+---+---+ |03:35:20.330969| 10.0.0.1| 10.0.0.4| 2| 1| 4|2.0| |03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 2| 4|2.0| |03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 1| 4|4.0| |03:35:20.327290| 10.0.0.1| 10.0.0.3| 4| 1| 4|1.0| |03:35:20.330845| 10.0.0.1| 10.0.0.3| 4| 2| 4|1.0| |03:35:20.330892| 10.0.0.1| 10.0.0.3| 4| 3| 4|1.0| |03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 4| 4|1.0| +---------------+-----------+-------------+-----+----+---+---+
已编辑
由于问题已被编辑,最终编辑后的答案如下,(这些是定义窗口函数后的代码)
//Add rank() to df val cnt = Frist_Dataframe.count val Dataframe_add_rank = Frist_Dataframe.withColumn("count", lit(cnt)) //Add cout to df val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show() //Add x(i) to df val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", count($"receiver_ip_2") over columns1and2_count)// Dataframe.show() //Add p(i)=maxrank(x(i))/count val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")// Dataframe.show() val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank")) val stdDevValue = std_dev.head.getDouble(0) std_dev.show() //Add attack status val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1)) final_add_count_rank_xi_attack.show()
希望您这次选对了 table 。
关于scala - 在scala中对Spark中执行的dataframe列进行操作时,出现 "Not supported within a window function"的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44909679/