python - 如何使用 pyspark 收集两个连续日期之间的新 ID 列表

标签 python apache-spark pyspark

我正在 PYSPARK 数据框中进行一周多的分组,并收集连续两个周数之间新的不同 ID。

我尝试在一周内进行groupby列,然后聚合数据帧上的collect_set方法来获取所有可用的ID,然后我逐一比较列表以获取两个连续列表之间按周数排列的不同 ID,如下所示:

输入:

<表类=“s-表”> <标题> 周 ID <正文> 1 ID_1 1 ID_2 1 ID_3 2 ID_1 2 ID_4 2 ID_5 3 ID_6

输出:

<表类=“s-表”> <标题> 周 ID_List diff_list new_ different_ID_count <正文> 1 ID_1、ID_2、ID_3 ID_1、ID_2、ID_3 - 2 ID_1、ID_4、ID_5 ID_4、ID_5 2 3 ID_2、ID_6 ID_6 1

这里的问题是,由于我的情况下有大量 ID(超过 900 万个 ID),我认为 Spark session 由于内存不足错误而被终止(错误 500)!

是否有其他解决方案可以使用 PYSPARK 获取连续两周内新的不同 ID 的列表?

最佳答案

为了扩展,您需要按 ID 进行聚合,而不收集任何结果。尝试以下方法:

import pyspark.sql.functions as F
from pyspark.sql import Window

data = spark.createDataFrame([(1, "ID_1"), (1, "ID_2"), (1, "ID_3"),
                              (2, "ID_1"), (2, "ID_4"), (2, "ID_5"),
                              (3, "ID_6")], ["Week", "ID"])

win = Window.partitionBy('ID').orderBy('Week')

agg_data = (
  data
  .withColumn("prevWeek", F.lag("Week", offset=1).over(win))
  .withColumn("isInPrevWeek", 
              F.col("prevWeek").isNotNull() & ((F.col("Week") - F.col("prevWeek")) == 1))
  .filter(~F.col("isInPrevWeek"))
  .groupBy("Week")
  .agg(F.count("*").alias("newIDs"),
       F.array_sort(F.collect_list("ID")).alias("showNewIDs"))  # Remove in production
  .orderBy("Week")
)

agg_data.show()

首先,函数 lag 使用窗口函数创建前一周的新列,该窗口函数允许单独考虑每个 ID(分区)并按时间顺序对周进行排序。由于 Spark 任务是由 ID 组组成的,因此可以很好地扩展。

然后,isInPrevWeek 检查该 ID 是否确实是上周的。如果是,则该记录被过滤掉。现在,您只需按周统计重新挖矿的 ID 即可。

+----+------+------------------+
|Week|newIDs|        showNewIDs|
+----+------+------------------+
|   1|     3|[ID_1, ID_2, ID_3]|
|   2|     2|      [ID_4, ID_5]|
|   3|     1|            [ID_6]|
+----+------+------------------+

请注意,该代码段收集 ID 仅用于说明目的,但计算并非必需。

关于python - 如何使用 pyspark 收集两个连续日期之间的新 ID 列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67121704/

相关文章:

scala - 为什么在显示运算符之后无法加入?

apache-spark - 仅保留 DataFrame 中有关某些字段的重复项

hadoop - 如何在pyspark中更改DataFrame的HDFS block 大小

python - 国家仪器长监控Python

python - 获取导致 IndexError 异常的索引

apache-spark - 如何在 SparkR 中进行 map 和 reduce

python - Databricks 连接测试在 "The system cannot find the path specified."上无限期挂起

python - 如何更改 QTreeView 中特定分支的颜色?

Python 日志记录 : disable stack trace

java - SparkLauncher 未启动应用程序