apache-spark - 计算pyspark中每组成对连续行之间的时间差

标签 apache-spark pyspark apache-spark-sql pyspark-sql

我想计算每个用户每个 SeqID 花费的时间。我有一个这样的数据框。 但是,时间被分配给每个用户的两个操作,Action_A 和 Action_B。 每个用户、每个 seqID 的总时间将是所有此类对的总和

对于第一个用户,它是 5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20: 00 - 2019-12-10 10:23:00)]

因此,第一个用户理想情况下为 SeqID 1 花费了 8 分钟(而不是 23 分钟)。

同样,用户 2 花费了 1 + 5 = 6 分钟

如何使用 pyspark 进行计算?

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")), 
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()

+---+-----+-------------------+--------+
| ID|SeqID|          Timestamp|  Action|
+---+-----+-------------------+--------+
|ID1|   15|2019-12-10 10:00:00|Action_A|
|ID1|   15|2019-12-10 10:05:00|Action_B|
|ID1|   15|2019-12-10 10:20:00|Action_A|
|ID1|   15|2019-12-10 10:23:00|Action_B|
|ID2|   23|2019-12-10 11:10:00|Action_A|
|ID2|   23|2019-12-10 11:11:00|Action_B|
|ID2|   23|2019-12-10 11:30:00|Action_A|
|ID2|   23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+

一旦我有了每对的数据,我就可以对整个组(ID,SeqID)求和

预期输出(也可能是秒)

+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1|   15|       8|
|ID2|   23|       6|
+---+-----+--------+

最佳答案

这是一个可能的解决方案,使用 Higher-Order Functions (Spark >=2.4):

transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"

df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
    .withColumn("transformed_ts_array", expr(transform_expr)) \
    .withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
    .drop("transformed_ts_array", "ts_array") \
    .show(truncate=False)

步骤:

  1. 收集每个组ID的所有时间戳到数组,SeqID并按升序排列
  2. 使用 lambda 函数 (x, i) => Double 对数组应用转换。其中 x 是实际元素,i 是它的索引。对于数组中的每个时间戳,我们计算与下一个时间戳的差异。我们乘以 (i+1)%2 以便只有 diff 作为每 2 对 2(第一个与第二个,第三个与第四个,......)因为总是有 2 Action 。
  3. 最后,我们聚合转换的结果数组以求和所有元素。

输出:

+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15   |8.0     |
|ID2|23   |6.0     |
+---+-----+--------+

关于apache-spark - 计算pyspark中每组成对连续行之间的时间差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59265546/

相关文章:

python - partitionBy 分配分区,但每个分区中的 WHERE

java - 尝试使用 Apache Spark Java API 透视表

amazon-web-services - 如何在Amazon Spark集群上启动4个实例?

apache-spark - 文件已存在错误从数据帧写入新文件

python - 如何在新的 Spark session 中再次读取 Spark 表?

python - Spark DataFrame mapPartitions

python - 合并 PysPark 中的重叠区间

apache-spark - Spark SQL 广播哈希连接

scala - 将行或列转换为数据框

scala - Apache Spark : How do I convert a Spark DataFrame to a RDD with type RDD[(Type1, Type2,...)]?