python - SQLAlchemy with_for_update 读取陈旧数据

标签 python sql postgresql sqlalchemy

我正在编写一个负责更新帐户余额的函数。为了防止并发更新,我先用with_for_update()锁定帐户,计算金额,更新余额,然后提交 session 。为了模拟并发请求,我生成了两个进程并在每个进程中运行一次该函数。这是计算和更新余额的代码:

session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)

accounts = (
    session.query(Account)
    .filter(Account.id == [some account IDs])
    .with_for_update()
    .populate_existing()
    .all()
)

print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print(hex(id(session)))
    print("*" * 80)

# Calculate the total amount outstanding by account.
for account in accounts:
    total_amount = _calculate_total_amount()
    if account.balance >= total_amount:
        # For accounts with sufficient balance, deduct the amount from the balance.
        account.balance -= total_amount
    else:
        # Otherwise, save them for notification. Code omitted.

print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print("-" * 80)
session.commit()
这是输出:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
0和1是进程号,十六进制数是 session 的id。你可以看到锁起作用了(进程 0 阻塞了 1 直到 0 提交),但是 1 读取了陈旧的数据:余额应该是 19930.01 ,而不是 20000 ,并且在进程 1 的输出中,“帐户版本”应该是 2,而不是 1。
我试过使用 populate_existing()没有运气,尽管我怀疑它无论如何都不会有帮助,因为这两个 session 是不同的,并且进程 1 的 session 不应该填充任何内容,直到进程 0 释放锁。我也尝试过“可重复read"和 "serializable"隔离级别,并且由于事务之间的并发更新/读/写依赖关系,期望在进程 1 中引发异常,但没有发生任何事情。
值得注意的是,行为并不一致。当我在本地运行上面的代码块时,一切正常,但当我用所有代码构建一个 Docker 容器并在那里运行它时,几乎从来没有工作过。软件包版本没有区别。我正在使用 Postgres 和 psycopg2。
我现在正用头撞墙,试图弄清楚发生了什么。我觉得也许我忽略了一些简单的事情。有任何想法吗?

最佳答案

FOR UPDATE会成功的。 The manual:

FOR UPDATE causes the rows retrieved by the SELECT statement to be locked as though for update. This prevents them from being locked, modified or deleted by other transactions until the current transaction ends. That is, other transactions that attempt UPDATE, DELETE, SELECT FOR UPDATE, SELECT FOR NO KEY UPDATE, SELECT FOR SHARE or SELECT FOR KEY SHARE of these rows will be blocked until the current transaction ends;


大胆强调我的。
这正是 SQLAlchemy 的 with_for_update()做。 The manual:

When called with no arguments, the resulting SELECT statement will have a FOR UPDATE clause appended.


然而 ,这是使用 SERIALIZABLE 操作时的多余工作像您一样进行快照隔离。 The manual:

This level emulates serial transaction execution for all committed transactions; as if transactions had been executed one after another, serially, rather than concurrently.


因此,您的代码对竞争条件是安全的,冗余。 任一个 使用 FOR UPDATE (推荐!),使用 SERIALIZABLE交易。后者通常要贵得多。并且您需要为序列化失败做好准备(不在您显示的代码中)。 The manual:

... like the Repeatable Read level, applications using this level must be prepared to retry transactions due to serialization failures.


房间里的大象:你真的写过数据库吗? session.commit()过早打印“任务完成”后可能会失败。
检查数据库日志以了解序列化失败或任何其他异常。如果您(不出所料)发现序列化失败,简单的 解决方案是切换到(默认!)READ COMMITED隔离级别。您的手动锁定已经完成了这项工作。

关于python - SQLAlchemy with_for_update 读取陈旧数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65402068/

相关文章:

sql - 选择与列表中所有项目匹配的行组

java - jOOQ - MySQL 多行插入...使用 VALUES() 函数进行重复键更新

javascript - 需要解决这个 javascript 编码问题

ruby-on-rails - 查询整数成员的 jsonb 数组

java - Eclipse/Hibernate 工具错误 : Archive classpath entry doesn't exist

python - Sympy 找不到 sinh (t) 的拉普拉斯变换

python - 检测并发中失败的任务。futures

r - 使用 R 通过 SSL 连接到 Postgres

来自参数字符串的 Python Optparse

python - Python 类可以应用双向重载吗?