我的数据源不断变化。我正在通过sqoop提取数据,但是由于数据量很大,我无法将其保留为每日截断负载。我想附加数据,但是逻辑应该更新并插入。如果通过删除先前的相同记录在源中更新记录,则应在配置单元中执行相同的操作,即应删除旧记录并插入/更新新记录。
下面是一个这样的例子。
在说了30分钟后,数据将像这样更新:
现在,我的配置单元表选择了原始记录,并在一段时间后选择了更新的记录,但将其插入到另一行中。
我希望数据被反射(reflect)为与源中的数据相同,而不覆盖我的表。
(建议使用Pyspark代码)
请帮忙。谢谢。
最佳答案
不提供查询,但给出了如何实现此目的的想法:
在源和您的实际配置单元表之间创建一个临时表,该表将具有所有记录(插入和更新)。
要获取实际的配置单元表,请使用等级函数,例如:
rank() over (partition by id order by ingested_ts desc) as rnk
---
---
where rnk = 1
注意:根据您的数据量,hive_staging表可能会增长,因此您需要相应地添加分区/存储桶。
关于hadoop - 通过pyspark更新Hive中的插入数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62486908/