python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合

标签 python apache-spark pyspark

我有一个数据集如下:

id  email   Date_of_purchase    time_of_purchase
1   abc@gmail.com   11/10/18    12:10 PM
2   abc@gmail.com   11/10/18    02:11 PM
3   abc@gmail.com   11/10/18    03:14 PM
4   abc@gmail.com   11/11/18    06:16 AM
5   abc@gmail.com   11/11/18    09:10 AM
6   def@gmail.com   11/10/18    12:17 PM
7   def@gmail.com   11/10/18    03:24 PM
8   def@gmail.com   11/10/18    08:16 PM
9   def@gmail.com   11/10/18    09:13 PM
10  def@gmail.com   11/11/18    12:01 AM

我想计算每个电子邮件 ID 在 4 小时内进行的交易数量。例如,电子邮件 ID:abc@gmail.com 从 11/10/18 12.10 PM 到 11/10/18 4.10 PM 进行了 3 笔交易,从 11/11/18 6.16 AM 到 11/11/18 进行了 2 笔交易上午 10.16。电子邮件 ID:def@gmail.com 从 11/10/18 12.17 PM 到 11/10/18 4.17 PM 进行了 2 笔交易,从 11/10/18 8.16 PM 到 11/11/18 12.16 AM 进行了 3 笔交易。

我想要的输出是:

 email          hour_interval                           purchase_in_4_hours
abc@gmail.com   [11/10/18 12.10 PM to 11/10/18 4.10 PM] 3
abc@gmail.com   [11/11/18 6.16 AM to 11/11/18 10.16 AM] 2
def@gmail.com   [11/10/18 12.17 PM to 11/10/18 4.17 PM] 2
def@gmail.com   [11/10/18 8.16 PM to 11/11/18 12.16 AM] 3

我的数据集有 1000k 行。我对 Spark 很陌生。任何帮助将不胜感激。 附:时间间隔可以从 4 小时更改为 1 小时、6 小时、1 天等。

TIA。

最佳答案

其想法是通过电子邮件对数据进行分区,按日期和时间在每个分区内进行排序,然后将每个分区映射到所需的输出。如果每个分区的数据(=一个电子邮件地址的数据)适合一个 Spark 执行器的内存,则此方法将起作用。

实际的 Spark 逻辑遵循以下步骤

  1. 创建一个包含时间戳的新列
  2. 按电子邮件对数据进行分区,以便具有相同电子邮件的所有行都属于同一分区。请注意,一个分区中可能存在来自多封电子邮件的数据。
  3. 按电子邮件和时间戳对每个分区进行排序。
  4. 处理每个分区。如有必要,根据需要为每个分区生成多个输出
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("header", "true").csv(<path>) #or any other data source
df = df.withColumn("date_time", to_timestamp(concat(col("Date_of_purchase"), lit(" "), col("time_of_purchase")), "MM/dd/yy hh:mm aa")) \
    .drop("Date_of_purchase", "time_of_purchase") \
    .repartition(col("email")) \
    .sortWithinPartitions(col("email"), col("date_time"))

def process_partition(df_chunk):
    row_list = list(df_chunk)
    if len(row_list) == 0:
        return
    email = row_list[0]['email']
    start = row_list[0]['date_time']
    end = start + timedelta(hours=4)
    count = 0
    for row in row_list:
        if email == row['email'] and end > row['date_time']:
            count = count +1
        else:
            yield Row(email, start, end, count)
            email = row['email']
            start = row['date_time']
            end = start + timedelta(hours=4)
            count = 1
    yield Row(email, start, end, count)

result = df.rdd.mapPartitions(process_partition).toDF(["email", "from", "to", "count"])
result.show()

输出:

+-------------+-------------------+-------------------+-----+
|        email|               from|                 to|count|
+-------------+-------------------+-------------------+-----+
|def@gmail.com|2018-11-10 12:17:00|2018-11-10 16:17:00|    2|
|def@gmail.com|2018-11-10 20:16:00|2018-11-11 00:16:00|    3|
|abc@gmail.com|2018-11-10 12:10:00|2018-11-10 16:10:00|    3|
|abc@gmail.com|2018-11-11 06:16:00|2018-11-11 10:16:00|    2|
+-------------+-------------------+-------------------+-----+

要更改周期长度,可以将 timedelta 设置为任何值。

关于python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58221350/

相关文章:

python - pyspark reduce 方法的歧义

python - pandas 嵌套迭代的矢量化解决方案

python - 给定数据库的 ODBC 连接字符串,我如何修改我的 settings.py 以便我的 Django 应用程序连接到它?

Python 将字符串转换为日期时间以便与日期时间对象进行比较

apache-spark - Spark OutOfMemoryError

log4j - 如何摆脱 "Using Spark' s 默认 log4j 配置文件 : org/apache/spark/log4j-defaults. 属性”消息?

python - PySpark(Python 2.7): How to flatten values after reduce

python - Datastax Spark Cassandra 连接器模块导入错误

python - 如何在 Django 模型中有效存储时区?

numpy - Spark 随机森林 - 无法将 float 转换为 int 错误