sql - 如何对增量数据处理行号窗函数

标签 sql python-3.x apache-spark pyspark apache-spark-sql

我有一个表,它作为行号窗口函数为某些 ID 运行。 现在,每次新数据到来时,它都会被满载,并且新的行号会再次分配给它们。因此 Row Num 再次在整个数据集上运行,这是相当低效的,因为会消耗大量资源并且导致 CPU 密集型。该表每 15 到 30 分钟构建一次。我正在尝试实现相同的目标,但使用增量,然后将增量的结果添加到特定 customer_ID 的最后一个 row_count

因此,当新记录出现时,我想保存该特定记录的最大 row_num 假设 max_row_num = 4 ,现在有两个新记录用于 ID,因此增量的 row_num 是 1,2。最终输出应该是4+1和4+2什么的。所以新的行号看起来像 1,2,3,4,5,6,在前一个 Row_num 的最大值上加上 1 和 2。

我实际上想在我的 Pyspark 中实现逻辑!但我对 python 解决方案持开放态度,然后可能会转换为 pyspark DataFrame。

请帮忙并提出可能的解决方案

满载——初始表

<表类=“s-表”> <标题> 行号 customer_ID <正文> 1 ABC123 2 ABC123 3 ABC123 1 ABC125 2 ABC125 1 ABC225 2 ABC225 3 ABC225 4 ABC225 5 ABC225

增量负载

<表类=“s-表”> <标题> 行号 customer_ID <正文> 1 ABC123 2 ABC123 1 ABC125 1 ABC225 2 ABC225 1 ABC330

期望的输出

<表类=“s-表”> <标题> 行号 customer_ID <正文> 1 ABC123 2 ABC123 3 ABC123 4 ABC123 1 ABC125 2 ABC125 3 ABC125 1 ABC225 2 ABC225 3 ABC225 4 ABC225 5 ABC225 6 ABC225 1 ABC330

最佳答案

如果您尝试插入具有新行号的值,您可以加入最大现有行号:

insert into full (row_num, customer_id)
    select i.row_number + coalesce(f.max_row_number, 0), i.customer_id
    from incremental i left join
         (select f.customer_id, max(row_number) as max_row_number
          from full f
          group by f.customer_id
         ) f
         on i.customer_id = f.customer_id;

关于sql - 如何对增量数据处理行号窗函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69243112/

相关文章:

apache-spark - 发送Spark流指标以打开tsdb

sql - PostgreSQL 中大写名称的正则表达式检查

mysql - 如何将mysql数据库同步到外部数据源

javascript - 在 Python 中使用 java 脚本抓取网页

Python 类变量更改未保存

python - 如何在 Spark RDD 中比较不区分大小写的字符串?

sql - 将行连接成列

SQL 开发人员-GIT

python - 试图在 python 中创建一个菜单,但循环不会退出

scala - 如何使用 udf 将空列添加到 Spark 中的复杂数组结构