scala - 如何使用 Scala 在 Spark 中进行滑动窗口排序?

标签 scala apache-spark window apache-spark-mllib ranking-functions

我有一个数据集:

+-----+-------------------+---------------------+------------------+
|query|similar_queries    |model_score          |count             |
+-----+-------------------+---------------------+------------------+
|shirt|funny shirt        |0.0034038130658784866|189.0             |
|shirt|shirt womens       |0.0019435265241921438|136.0             |
|shirt|watch              |0.001097496453284101 |212.0             |
|shirt|necklace           |6.694577024597908E-4 |151.0             |
|shirt|white shirt        |0.0037413097560623485|217.0             |
|shirt|shoes              |0.0022062579255572733|575.0             |
|shirt|crop top           |9.065831060804897E-4 |173.0             |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |
|shirt|shorts             |0.002669621942466027 |200.0             |
|shirt|black shirt        |0.03264296242546658  |114.0             |
+-----+-------------------+---------------------+------------------+

我首先根据“计数”对数据集进行排名。

lazy val countWindowByFreq = Window.partitionBy(col(QUERY)).orderBy(col(COUNT).desc)
val ranked_data = data.withColumn("count_rank", row_number over countWindowByFreq)

+-----+-------------------+---------------------+------------------+----------+
|query|similar_queries    |model_score          |count             |count_rank|
+-----+-------------------+---------------------+------------------+----------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |
|shirt|watch              |0.001097496453284101 |212.0             |4         |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |
+-----+-------------------+---------------------+------------------+----------+

我现在尝试使用 row_number(4 行)上的滚动窗口对内容进行排名,并根据 model_score 在窗口内排名。例如:

在第一个窗口中,row_number 1到4,新的排名(新列)将为

1. polo shirts for men
2. white shirt
3. shoes
4. watch

在第一个窗口中,row_number 5到8,新的排名(新列)将为

5. funny shirt
6. shorts
7. shirt womens 
8. crop top

在第一个窗口中,row_number 9 休息,新的排名(新列)将为

9. black shirt 
10. shirt womens

有人可以告诉我如何使用 Spark 和 Scala 实现目标吗?有没有我可以使用的预定义函数?

我尝试过:

lazy val MODEL_RANK = Window.partitionBy(col(QUERY)) .orderBy(col(MODEL_SCORE).desc).rowsBetween( 0, 3)

但这给了我:

sql.AnalysisException: Window Frame ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

此外,尝试使用 .rowsBetween(-3, 0) 但这也给了我错误:

org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN 3 PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

最佳答案

既然您已经计算了 count_rank,下一步就是找到一种方法将行分组为四组。可以按如下方式完成:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val ranked_data_grouped = ranked_data
  .withColumn("bucket", (($"count_rank" -1)/4).cast(IntegerType))

ranked_data_grouped 看起来像:

+-----+-------------------+---------------------+------------------+----------+-------+
|query|similar_queries    |model_score          |count             |count_rank|bucket |
+-----+-------------------+---------------------+------------------+----------+-------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |0      |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |0      |      
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |0      |
|shirt|watch              |0.001097496453284101 |212.0             |4         |0      |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |1      |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |1      |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |1      |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |1      |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |2      |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |2      |
+-----+-------------------+---------------------+------------------+----------+-------+

现在,您所要做的就是按存储桶分区并按model_score排序:

val output = ranked_data_grouped
  .withColumn("finalRank", row_number().over(Window.partitionBy($"bucket").orderBy($"model_score".desc)))

关于scala - 如何使用 Scala 在 Spark 中进行滑动窗口排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56122356/

相关文章:

scala - def 关键字与火箭( => )表示法会改变 scala 中的函数行为吗?

scala - 字段中的空值会生成 MatchError

hadoop - spark-shell 无法连接到远程主机

c# - 如何在 UWP 中获取可用的串口?

cocoa - 什么可能会导致 OSX/Cocoa 应用程序中的空窗口调整大小响应迟缓/不稳定?

c# - 单击主应用程序窗口时如何聚焦模态 WPF 窗口

scala - Spark 样本太慢

scala - Slick 3.0.0 中如果不存在则插入

scala - 我需要一个 Scala 中的客户端库用于 Redis 集群

python - Spark 中的分区和分桶有什么区别?