我正在尝试查询 MySql 数据库表的一个子集,将结果提供给 Pandas DataFrame,更改一些数据,然后将更新的行写回同一个表。我的表大小约为 1MM 行,我要更改的行数将相对较小(<50,000),因此带回整个表并执行 df.to_sql(tablename,engine, if_exists='replace' )
不是一个可行的选择。是否有一种直接的方法来更新已更改的行,而无需遍历 DataFrame 中的每一行?
我知道这个项目试图模拟“upsert”工作流程,但它似乎只完成了插入新的非重复行的任务,而不是更新现有行的部分内容:
这是我试图在更大范围内完成的工作的框架:
import pandas as pd
from sqlalchemy import create_engine
import threading
#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)
engine = create_engine(SQLALCHEMY_DATABASE_URI)
#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
A INTEGER,
B INTEGER,
PRIMARY KEY (A))
""")
#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)
#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6
现在我想将 df_in_db
写回我的 'test_upsert'
表,并反射(reflect)更新后的数据。
这个 SO 问题非常相似,其中一条评论建议使用“sqlalchemy 表类”来执行任务。
Update table using sqlalchemy table class
如果这是最好的(唯一的?)实现方式,谁能详细说明我将如何针对我的上述具体案例实现该实现方式?
最佳答案
我认为最简单的方法是:
首先删除那些将被“更新”的行。这可以在一个循环中完成,但它对于更大的数据集(5K+ 行)不是很有效,所以我将 DF 的这个片段保存到一个临时的 MySQL 表中:
# assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask] # `mask` should help us to find changed rows...
# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')
# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)
conn = engine.connect()
trans = conn.begin()
try:
# delete those rows that we are going to "upsert"
engine.execute('delete from test_upsert where a in (select a from my_tmp)')
trans.commit()
# insert changed rows
x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
trans.rollback()
raise
PS 我没有测试这段代码,所以它可能有一些小错误,但它应该给你一个想法......
关于python - 如何使用 Pandas DataFrame 对数据库表的现有行执行更新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42461959/