我通常使用一个函数来生成 upsert 语句以将数据帧(逐行)馈送到 Postgres。这似乎按预期工作,但我注意到 SERIAL 列分配了一个新编号。这是需要的还是我应该改变一些东西?所以我最早的行(例如第 1 行)现在可能是第 30,128 行或其他行。
def create_update_query(final_columns, primary_key, table):
"""This creates an UPSERT statement to replace values if there is a conflict with the primary key"""
columns = ', '.join([f'{col}' for col in final_columns])
constraint = ', '.join([f'{col}' for col in primary_key])
placeholder = ', '.join([f'%({col})s' for col in final_columns])
updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in final_columns])
query = f"""INSERT INTO {table} ({columns})
VALUES ({placeholder})
ON CONFLICT ({constraint})
DO UPDATE SET {updates};"""
query.split()
query = ' '.join(query.split())
return query
- final_columns = 数据框 (df.columns),它不包含 row_key 或 insert_timestamp(这两者都是由 postgres 创建的)
- primary_key 是 postgres 中用于 PRIMARY KEY 以防止重复的列
示例输出查询:
'INSERT INTO example.fact_table (report_date, employee_id, state_count, state_time) VALUES (%(report_date)s, %(employee_id)s, %(state_count)s, %(state_time)s) ON CONFLICT (report_date, employee_id) DO UPDATE SET report_date = EXCLUDED.report_date, employee_id = EXCLUDED.employee_id, state_count = EXCLUDED.state_count, state_time = EXCLUDED.state_time;'
这是通过以下方式提供给数据库的:
for row in insert_values:
cursor.execute(create_update_query(final_columns, primary_key, table), row)
conn.commit()
最佳答案
当您说SERIAL 列分配了一个新编号时,我相信您的意思是即使存在冲突,序列值也会增加,增加冲突行的数量。
如果是这样,这是预期的行为 - 请参阅 here .
处理此问题的一种方法是首先检查是否存在冲突情况(与 CTE),然后根据需要进行插入或更新。
这不会导致序列号因冲突而增加,但它不再是传统意义上的upsert(这不是问题)。
关于postgresql - 更新插入更改行键(串行)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46709354/