hadoop - 通过pyspark更新Hive中的插入数据

标签 hadoop pyspark hive apache-spark-sql pyspark-dataframes

我的数据源不断变化。我正在通过sqoop提取数据,但是由于数据量很大,我无法将其保留为每日截断负载。我想附加数据,但是逻辑应该更新并插入。如果通过删除先前的相同记录在源中更新记录,则应在配置单元中执行相同的操作,即应删除旧记录并插入/更新新记录。
下面是一个这样的例子。
enter image description here
在说了30分钟后,数据将像这样更新:
enter image description here
现在,我的配置单元表选择了原始记录,并在一段时间后选择了更新的记录,但将其插入到另一行中。
enter image description here
我希望数据被反射(reflect)为与源中的数据相同,而不覆盖我的表。
(建议使用Pyspark代码)
请帮忙。谢谢。

最佳答案

不提供查询,但给出了如何实现此目的的想法:
在源和您的实际配置单元表之间创建一个临时表,该表将具有所有记录(插入和更新)。
要获取实际的配置单元表,请使用等级函数,例如:

rank() over (partition by id order by ingested_ts desc) as rnk
---
---
where rnk = 1
enter image description here
注意:根据您的数据量,hive_staging表可能会增长,因此您需要相应地添加分区/存储桶。

关于hadoop - 通过pyspark更新Hive中的插入数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62486908/

相关文章:

hadoop - 更正配置单元脚本

hadoop - pig :过滤出关系中的最后一个元组

hadoop - Sqoop Hive 以状态 1 退出

apache-spark - 如何在 Spark 中过滤具有特定条件的数据帧

apache-spark - 如何在 macOS Mojave 上使用 Pandas UDF? (由于 [__NSPlaceholderDictionary initialize] 可能正在进行中而失败...)

windows - 如何在 Windows 10 上运行 Spark Streaming 应用程序?

hive -如何查看在metastore中创建的表?

java - Hadoop Docker设置-WordCount教程

insert - cassandra hive插入空异常

hive - Hive 中的数组字面量