python - 每次 Spark 窗口函数

标签 python apache-spark pyspark apache-spark-sql

我有一些具有以下结构的数据框:

ID| Page    |   User          |    Timestamp      |
|1|Page 1   |Ericd            |2002-09-07 19:39:55|
|1|Page 1   |Liir             |2002-10-12 03:01:42|
|1|Page 1   |Tubby            |2002-10-12 03:02:23|
|1|Page 1   |Mojo             |2002-10-12 03:18:24|
|1|Page 1   |Kirf             |2002-10-12 03:19:03|
|2|Page 2   |The Epopt        |2001-11-28 22:27:37|
|2|Page 2   |Conversion script|2002-02-03 01:49:16|
|2|Page 2   |Bryan Derksen    |2002-02-25 16:51:15|
|2|Page 2   |Gear             |2002-10-04 12:46:06|
|2|Page 2   |Tim Starling     |2002-10-06 08:13:42|
|2|Page 2   |Tim Starling     |2002-10-07 03:00:54|
|2|Page 2   |Salsa Shark      |2003-03-18 01:45:32|

我想找到一段时间内访问页面的用户数量(例如每个月)。例如,2002 年第 10 个月的结果将为

|1|Page 1   |Liir             |2002-10-12 03:01:42| 
|1|Page 1   |Tubby            |2002-10-12 03:02:23|
|1|Page 1   |Mojo             |2002-10-12 03:18:24|
|1|Page 1   |Kirf             |2002-10-12 03:19:03|
|2|Page 2   |Gear             |2002-10-04 12:46:06|
|2|Page 2   |Tim Starling     |2002-10-06 08:13:42|
|2|Page 2   |Tim Starling     |2002-10-07 03:00:54|

和页数:

              numberOfUsers (in October 2002)
|1|Page 1   |      4
|2|Page 2   |      3 

问题还在于如何将这种逻辑应用于每年的每个月。我想出了如何查找最近 n 天发生的事件

days = lambda i: i * 86400
window = (Window().partitionBy(col("page"))
          .orderBy(col("timestamp").cast("timestamp").cast("long")).rangeBetween(-days(30), 0))

df = df.withColumn("monthly_occurrences", func.count("user").over(window))
df.show()

我会感激一些建议

最佳答案

您可以首先创建包含年月组合的列,然后使用该列进行分组。所以一个有效的例子是:

import pyspark.sql.functions as F

df = sc.parallelize([
    ('2018-06-02T00:00:00.000Z','tim', 'page 1' ),
    ('2018-07-20T00:00:00.000Z','tim', 'page 1' ),
    ('2018-07-20T00:00:00.000Z','john', 'page 2' ),
    ('2018-07-20T00:00:00.000Z','john', 'page 2' ),
    ('2018-08-20T00:00:00.000Z','john', 'page 2' )
]).toDF(("datetime","user","page" ))

df = df.withColumn('yearmonth',F.concat(F.year('datetime'),F.lit('-'),F.month('datetime')))    
df_agg = df.groupBy('yearmonth','page').count()
df_agg.show()

输出:

+---------+------+-----+
|yearmonth|  page|count|
+---------+------+-----+
|   2018-7|page 2|    2|
|   2018-6|page 1|    1|
|   2018-7|page 1|    1|
|   2018-8|page 2|    1|
+---------+------+-----+

希望这有帮助!

关于python - 每次 Spark 窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51236909/

相关文章:

python - Tensorflow: session 之间正确的队列关闭

python - 无法使用 selenium-webdriver 设置复选框

hadoop - Apache Spark S3 错误

java - 使用 Spark 查询存储在 HDFS 中的数据的最佳方法是什么?

python - 无法创建 Spark session

apache-spark - PySpark 在 2 个数据帧上应用函数,并在小型硬件上写入数十亿行的 csv

python - 如何使用来自不同数据帧的条件将列添加到 pyspark 数据帧

python - 如何在 virtualenv 中使用 pypy 安装 lxml

python - opencv+python : Assertion failure when findcontours

java - 如何使用 Apache spark 计算平均值?