我正在使用 alembic 在表中添加一个字段。
我正在添加字段 last_name
,并使用 do_some_processing
函数填充数据,该函数从其他来源加载该字段的数据。
这是表模型,我将字段 last_name
添加到模型中
class MyTable(db.Model):
__tablename__ = "my_table"
index = db.Column(db.Integer, primary_key=True, nullable=False)
age = db.Column(db.Integer(), default=0)
first_name = db.Column(db.String(100), nullable=False)
last_name = db.Column(db.String(100), nullable=False)
这是我的迁移,效果很好
# migration_add_last_name_field
op.add_column('my_table', sa.Column('last_name', sa.String(length=100), nullable=True))
values = session.query(MyTable).filter(MyTable.age == 5).all()
for value in values:
first_name = value.first_name
value.last_name = do_some_processing(first_name)
session.commit()
问题是,使用 session.query(MyTable)
会导致将来的迁移出现问题。
例如,如果我将来添加一个迁移,将字段 foo
添加到表中,并将该字段添加到 class MyTable
中,
如果我有未更新的环境,它将运行 migration_add_last_name_field
并且失败
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError)
(1054, "Unknown column 'my_table.foo' in 'field list'")
[SQL: SELECT my_table.`index` AS my_table_index, my_table.first_name AS my_table_first_name,
my_table.last_name AS my_table_last_name, my_table.foo AS my_table_foo
FROM my_table
WHERE my_table.age = %s]
[parameters: (0,)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
因为添加 foo
的迁移仅在之后运行,但 session.query(MyTable)
获取 MyTable
模型中的所有字段,包括 foo
。
我尝试在不选择所有字段的情况下进行更新,以避免选择尚未创建的字段,如下所示:
op.add_column('my_table', sa.Column('last_name', sa.String(length=100), nullable=True))
values = session.query(MyTable.last_name, MyTable.first_name).filter(MyTable.age == 0).all()
for value in values:
first_name = value.first_name
value.last_name = do_some_processing(first_name)
session.commit()
但这会导致错误:无法设置属性
我还尝试了 select *
的不同变体,但也没有成功。
正确的解决方案是什么?
最佳答案
食谱在这里描述了这个问题:data-migrations-general-techniques
这里的一些选项可能是:
- 使用单独的元数据和反射来加载前后的表
- 随后手动创建表格并仅引用您需要的列
- 最好的情况是运行仅 SQL 且不依赖于 python 级别处理的更新(在下面的示例中,我可以通过将 int 转换为字符串来完成此操作,例如
op.execute("UPDATE users SET name = CAST(user_id AS text)")
。不过我知道这并不总是可行。
以下是选项 2 的示例:
在本例中,users
表中只有一个 user_id
列,然后我将其转换为字符串以设置为新的name
专栏。
def upgrade() -> None:
metadata = sa.MetaData()
op.add_column("users", sa.Column("name", sa.String))
# New table with the added col and the other col that should exist already.
users_t = sa.Table(
"users",
metadata,
sa.Column("user_id", sa.Integer, primary_key=True),
sa.Column("name", sa.String))
user_ids = op.get_bind().execute(sa.select(users_t.c.user_id)).scalars().all()
# Slow but would work.
for user_id in user_ids:
op.execute(
users_t.update().where(
users_t.c.user_id==op.inline_literal(user_id)).values({
"name": op.inline_literal(str(user_id))}))
关于python - sqlalchemy - alembic 运行更新查询而不指定模型以避免以后的迁移冲突,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76399565/