我正在 PYSPARK 数据框中进行一周多的分组,并收集连续两个周数之间新的不同 ID。
我尝试在一周内进行groupby列,然后聚合数据帧上的collect_set方法来获取所有可用的ID,然后我逐一比较列表以获取两个连续列表之间按周数排列的不同 ID,如下所示:
输入:
输出:
这里的问题是,由于我的情况下有大量 ID(超过 900 万个 ID),我认为 Spark session 由于内存不足错误而被终止(错误 500)!
是否有其他解决方案可以使用 PYSPARK 获取连续两周内新的不同 ID 的列表?
最佳答案
为了扩展,您需要按 ID 进行聚合,而不收集任何结果。尝试以下方法:
import pyspark.sql.functions as F
from pyspark.sql import Window
data = spark.createDataFrame([(1, "ID_1"), (1, "ID_2"), (1, "ID_3"),
(2, "ID_1"), (2, "ID_4"), (2, "ID_5"),
(3, "ID_6")], ["Week", "ID"])
win = Window.partitionBy('ID').orderBy('Week')
agg_data = (
data
.withColumn("prevWeek", F.lag("Week", offset=1).over(win))
.withColumn("isInPrevWeek",
F.col("prevWeek").isNotNull() & ((F.col("Week") - F.col("prevWeek")) == 1))
.filter(~F.col("isInPrevWeek"))
.groupBy("Week")
.agg(F.count("*").alias("newIDs"),
F.array_sort(F.collect_list("ID")).alias("showNewIDs")) # Remove in production
.orderBy("Week")
)
agg_data.show()
首先,函数 lag
使用窗口函数创建前一周的新列,该窗口函数允许单独考虑每个 ID(分区)并按时间顺序对周进行排序。由于 Spark 任务是由 ID 组组成的,因此可以很好地扩展。
然后,isInPrevWeek
检查该 ID 是否确实是上周的。如果是,则该记录被过滤掉。现在,您只需按周统计重新挖矿的 ID 即可。
+----+------+------------------+
|Week|newIDs| showNewIDs|
+----+------+------------------+
| 1| 3|[ID_1, ID_2, ID_3]|
| 2| 2| [ID_4, ID_5]|
| 3| 1| [ID_6]|
+----+------+------------------+
请注意,该代码段收集 ID 仅用于说明目的,但计算并非必需。
关于python - 如何使用 pyspark 收集两个连续日期之间的新 ID 列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67121704/