apache-spark - 如何计算匹配相关条件的行数?

标签 apache-spark apache-spark-sql

我的表格如下所示:

TripID    | Name | State  
    1     | John |  OH       
    2     | John |  OH  
    3     | John |  CA  
    4     | John |  OH  
    1     | Mike |  CA  
    2     | Mike |  CA  
    3     | Mike |  OH

我想统计先前往俄亥俄州,然后前往加利福尼亚州的人数。

在上面的例子中,只有 John,所以答案应该是 1。

所以我想知道如何在SQL过滤中设置一定的顺序来过滤结果?

最佳答案

我可能误解了你的问题,但如果你问的是:

how many people travelled to OH first and then to CA.

查询(的草图)可能如下所示:

scala> trips.show
+------+----+-----+
|tripid|name|state|
+------+----+-----+
|     1|John|   OH|
|     2|John|   OH|
|     3|John|   CA|
|     4|John|   OH|
|     1|Mike|   CA|
|     2|Mike|   CA|
|     3|Mike|   OH|
+------+----+-----+

scala> trips.orderBy("name", "tripid").groupBy("name").agg(collect_list("state")).show
+----+-------------------+
|name|collect_list(state)|
+----+-------------------+
|John|   [OH, OH, CA, OH]|
|Mike|       [CA, CA, OH]|
+----+-------------------+

正如我现在所看到的,您有两个选择:

  1. (困难)编写一个用户定义的聚合函数 (UDAF) 来执行聚合(并将 collect_list 替换为包含不同状态的行程)。

  2. (更简单)编写一个用户定义函数 (UDF),其功能与上面的 UDAF 类似(但在 collect_list 收集值之后)。

  3. (简单)使用functions (如 explode 和/或 window )

让我们采用简单的解决方案(不一定是最有效的!)。

事实证明groupBy早期并不是真正必要的(!)您可以单独使用窗口聚合来处理它(使用两次)。

import org.apache.spark.sql.expressions.Window
val byName = Window.partitionBy("name").orderBy("tripid")

val distinctStates = trips.withColumn("rank", rank over byName).dropDuplicates("name", "state").orderBy("name", "rank")

scala> distinctStates.show
+------+----+-----+----+
|tripid|name|state|rank|
+------+----+-----+----+
|     1|John|   OH|   1|
|     3|John|   CA|   3|
|     1|Mike|   CA|   1|
|     3|Mike|   OH|   3|
+------+----+-----+----+

// rank again but this time use the pre-calculated distinctStates dataset
val distinctStatesRanked = distinctStates.withColumn("rank", rank over byName).orderBy("name", "rank")

scala> distinctStatesRanked.show
+------+----+-----+----+
|tripid|name|state|rank|
+------+----+-----+----+
|     1|John|   OH|   1|
|     3|John|   CA|   2|
|     1|Mike|   CA|   1|
|     3|Mike|   OH|   2|
+------+----+-----+----+

val left = distinctStatesRanked.filter($"state" === "OH").filter($"rank" === 1)
val right = distinctStatesRanked.filter($"state" === "CA").filter($"rank" === 2)
scala> left.join(right, "name").show
+----+------+-----+----+------+-----+----+
|name|tripid|state|rank|tripid|state|rank|
+----+------+-----+----+------+-----+----+
|John|     1|   OH|   1|     3|   CA|   2|
+----+------+-----+----+------+-----+----+

关于apache-spark - 如何计算匹配相关条件的行数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43768641/

相关文章:

apache-spark - 在Kerberos化Hadoop环境中启用了Spark并启用了高可用性:Spark SQL仅在写入任务后才能读取数据

apache-spark - 使用 LIMIT 两次评估 Spark SQL 查询时获得相同的结果

python - 在 Spark 中关闭 MYSQL JDBC 连接

scala - Spark 数与拍摄和长度

sql - SparkSQL : conditional sum using two columns

Scala Spark 用 NULL 替换空字符串

python - rlike中的pyspark数据帧如何从数据帧列之一逐行传递字符串值

apache-spark - Spark.table 与 sql() AccessControlException

python - 如何修复pyspark中的 'Container exited with a non-zero exit code 143'错误

java - Spark : Splitting using delimiter doesn't work with commas