我想计算每个用户每个 SeqID 花费的时间。我有一个这样的数据框。
但是,时间被分配给每个用户的两个操作,Action_A 和 Action_B。
每个用户、每个 seqID 的总时间将是所有此类对的总和
对于第一个用户,它是 5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20: 00 - 2019-12-10 10:23:00)]
因此,第一个用户理想情况下为 SeqID 1 花费了 8 分钟
(而不是 23 分钟
)。
同样,用户 2 花费了 1 + 5 = 6 分钟
如何使用 pyspark 进行计算?
data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")),
(("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
(("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
(("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
(("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
(("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
(("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
(("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()
+---+-----+-------------------+--------+
| ID|SeqID| Timestamp| Action|
+---+-----+-------------------+--------+
|ID1| 15|2019-12-10 10:00:00|Action_A|
|ID1| 15|2019-12-10 10:05:00|Action_B|
|ID1| 15|2019-12-10 10:20:00|Action_A|
|ID1| 15|2019-12-10 10:23:00|Action_B|
|ID2| 23|2019-12-10 11:10:00|Action_A|
|ID2| 23|2019-12-10 11:11:00|Action_B|
|ID2| 23|2019-12-10 11:30:00|Action_A|
|ID2| 23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+
一旦我有了每对的数据,我就可以对整个组(ID,SeqID)求和
预期输出(也可能是秒)
+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1| 15| 8|
|ID2| 23| 6|
+---+-----+--------+
最佳答案
这是一个可能的解决方案,使用 Higher-Order Functions (Spark >=2.4):
transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"
df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
.withColumn("transformed_ts_array", expr(transform_expr)) \
.withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
.drop("transformed_ts_array", "ts_array") \
.show(truncate=False)
步骤:
- 收集每个组
ID
的所有时间戳到数组,SeqID
并按升序排列 - 使用 lambda 函数
(x, i) => Double
对数组应用转换。其中x
是实际元素,i
是它的索引。对于数组中的每个时间戳,我们计算与下一个时间戳的差异。我们乘以(i+1)%2
以便只有 diff 作为每 2 对 2(第一个与第二个,第三个与第四个,......)因为总是有 2 Action 。 - 最后,我们聚合转换的结果数组以求和所有元素。
输出:
+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15 |8.0 |
|ID2|23 |6.0 |
+---+-----+--------+
关于apache-spark - 计算pyspark中每组成对连续行之间的时间差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59265546/