sql - 在spark中找到两个表之间最接近的时间

标签 sql apache-spark pyspark apache-spark-sql

我正在使用 pyspark,我有两个这样的数据框:

user         time          bus
 A    2016/07/18 12:00:00   1
 B    2016/07/19 12:00:00   2
 C    2016/07/20 12:00:00   3

bus          time          stop
 1    2016/07/18 11:59:40   sA
 1    2016/07/18 11:59:50   sB
 1    2016/07/18 12:00:05   sC
 2    2016/07/19 11:59:40   sB
 2    2016/07/19 12:00:10   sC
 3    2016/07/20 11:59:55   sD
 3    2016/07/20 12:00:10   sE

现在我想知道用户在第二个表中根据公交车号和最近的时间报告的站点。

例如,表1中,用户A在2016/07/18 12:00:00上报,他在1号车上,根据第二张表,1号车有3条记录,但是最近的时间是2016/07/18 12:00:05(第三条记录),所以用户现在在sC。

所需的输出应该是这样的:
user         time          bus  stop
 A    2016/07/18 12:00:00   1    sC
 B    2016/07/19 12:00:00   2    sC
 C    2016/07/20 12:00:00   3    sD

我已将时间转换为时间戳,因此唯一的问题是找到总线编号为 eqaul 的最近时间戳。

因为我现在对sql不熟悉,我尝试使用map函数找到最近的时间和它的停靠点,这意味着我必须在map函数中使用sqlContext.sql,而spark似乎不允许这样做:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.



那么如何编写 sql 查询来获得正确的输出呢?

最佳答案

这可以使用窗口函数来完成。

from pyspark.sql.window import Window
from pyspark.sql import Row, functions as W

def tm(str):
    return datetime.strptime(str, "%Y/%m/%d %H:%M:%S")

#setup data
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ]
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2))
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3))

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ]
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB"))
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC"))
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB"))
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC"))
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD"))
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE"))

#create RDD
userDf = sc.parallelize(userTime).toDF().alias("usertime")
busDf = sc.parallelize(busTime).toDF().alias("bustime")

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
    userDf.user,
    userDf.time.alias("user_time"),
    busDf.bus,
    busDf.time.alias("bus_time"),
    busDf.stop)

additional_cols = joinedDF.withColumn("bus_time_diff",  abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time"))))

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff") ).alias("rank") ).filter(col("rank") == 1)


additional_cols.show(20,False)
partDf.show(20,False)

输出:
+----+---------------------+---+---------------------+----+-------------+
|user|user_time            |bus|bus_time             |stop|bus_time_diff|
+----+---------------------+---+---------------------+----+-------------+
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:40.0|sA  |20           |
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:50.0|sB  |10           |
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 11:59:40.0|sB  |20           |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 12:00:10.0|sE  |10           |
+----+---------------------+---+---------------------+----+-------------+
+----+---------------------+---+---------------------+----+-------------+----+
|user|user_time            |bus|bus_time             |stop|bus_time_diff|rank|
+----+---------------------+---+---------------------+----+-------------+----+
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |1   |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |1   |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |1   |
+----+---------------------+---+---------------------+----+-------------+----+

关于sql - 在spark中找到两个表之间最接近的时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38623782/

相关文章:

mysql - 返回不存在记录的id

java - JDBC 插入查询错误

apache-spark - 在 yarn 上运行 spark 时参数列表太长

scala - 递归地向数据框添加行

python - 无法训练 pySpark SVM,标记点问题

pyspark - 获取 pyspark 中上一个分区的最后一个值

php - 需要帮助创建基于 ui 过滤器值的查询

sql - 避免约会冲突 Access

apache-spark - 创建一个包含不使用 UDF 的数字范围的 Df 列

python - 如何在pyspark sql中保存一个表?