本问题末尾的(遗憾的是冗长的)MWE 是从实际应用程序中删除的。它应该像这样工作:有两个表。一种包括已处理和尚未处理的数据,另一种包括处理数据的结果。启动时,我们创建一个临时表,列出所有尚未处理的数据。然后我们在该表上打开一个读取游标并从头到尾扫描它;对于每个数据,我们都会进行一些处理(在 MWE 中省略),然后使用单独的游标将结果插入到已处理数据表中。
这在自动提交模式下可以正常工作。然而,如果写操作被包装在一个事务中——并且在实际应用程序中,它必须如此,因为写操作实际上涉及多个表(除了其中一个之外的所有表都已从 MWE 中省略)——那么 COMMIT 操作具有重置临时表上的读取游标的副作用,导致已处理的行被重新处理,这不仅会阻止前进,还会导致程序崩溃并出现 IntegrityError
尝试将重复行插入到 data_out
中。如果运行 MWE,您应该会看到以下输出:
0
1
2
3
4
5
6
7
8
9
10
0
---
127 rows remaining
Traceback (most recent call last):
File "sqlite-test.py", line 85, in <module>
test_main()
File "sqlite-test.py", line 83, in test_main
test_run(db)
File "sqlite-test.py", line 71, in test_run
(row[0], b"output"))
sqlite3.IntegrityError: UNIQUE constraint failed: data_out.value
如何防止读取游标因 COMMIT 接触不相关的表而重置?
注释:模式中的所有 INTEGER 都是 ID 号;在实际应用程序中,还有几个辅助表为每个 ID 保存更多信息,并且除了 data_out
之外,写事务还会触及其中的两个或三个辅助表,具体取决于计算结果。在实际应用程序中,临时“data_todo”表可能非常大——数百万行;我之所以开始走这条路,正是因为 Python 列表太大,内存无法容纳。 MWE 的 shebang 适用于 python3,但它在 python2 下的行为完全相同(前提是解释器足够新,可以理解 b"..."
字符串)。设置PRAGMAlocking_mode = EXCLUSIVE;
和/或PRAGMAjournal_mode=WAL;
对此现象没有影响。我正在使用 SQLite 3.8.2。
#! /usr/bin/python3
import contextlib
import sqlite3
import sys
import tempfile
import textwrap
def init_db(db):
db.executescript(textwrap.dedent("""\
CREATE TABLE data_in (
origin INTEGER,
origin_id INTEGER,
value INTEGER,
UNIQUE(origin, origin_id)
);
CREATE TABLE data_out (
value INTEGER PRIMARY KEY,
processed BLOB
);
"""))
db.executemany("INSERT INTO data_in VALUES(?, ?, ?);",
[ (1, x, x) for x in range(100) ])
db.executemany("INSERT INTO data_in VALUES(?, ?, ?);",
[ (2, x, 200 - x*2) for x in range(100) ])
db.executemany("INSERT INTO data_out VALUES(?, ?);",
[ (x, b"already done") for x in range(50, 130, 5) ])
db.execute(textwrap.dedent("""\
CREATE TEMPORARY TABLE data_todo AS
SELECT DISTINCT value FROM data_in
WHERE value NOT IN (SELECT value FROM data_out)
ORDER BY value;
"""))
def test_run(db):
init_db(db)
read_cur = db.cursor()
write_cur = db.cursor()
read_cur.arraysize = 10
read_cur.execute("SELECT * FROM data_todo;")
try:
while True:
block = read_cur.fetchmany()
if not block: break
for row in block:
# (in real life, data actually crunched here)
sys.stdout.write("{}\n".format(row[0]))
write_cur.execute("BEGIN TRANSACTION;")
# (in real life, several more inserts here)
write_cur.execute("INSERT INTO data_out VALUES(?, ?);",
(row[0], b"output"))
db.commit()
finally:
read_cur.execute("SELECT COUNT(DISTINCT value) FROM data_in "
"WHERE value NOT IN (SELECT value FROM data_out)")
result = read_cur.fetchone()
sys.stderr.write("---\n{} rows remaining\n".format(result[0]))
def test_main():
with tempfile.NamedTemporaryFile(suffix=".db") as tmp:
with contextlib.closing(sqlite3.connect(tmp.name)) as db:
test_run(db)
test_main()
最佳答案
对临时表使用第二个单独的连接,它将不受其他连接上的提交的影响。
关于python - (python-)sqlite3 : prevent COMMIT from resetting read cursor on unrelated temporary table,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21995515/