python - Sqlite3 内存中多线程问题 - 阻塞的线程立即失败

标签 python python-3.x multithreading sqlite

我正在尝试创建内存中的 sqlite3 缓存来存储 oauth token ,但遇到了有关多线程的问题。运行几次测试后,我注意到其行为与非内存数据库和多线程有很大不同。

值得注意的是,如果写入线程在未提交的情况下进行写入,则读取线程会立即失败并显示“表已锁定”。即使使用 isolation_level=None,多线程也是如此。

这不仅仅是读取器在事务完成之前被阻塞,而是立即失败,无论 timeoutPRAGMA busy_timeout = 10000

让它工作的唯一方法是设置isolation_level=None并执行PRAGMA read_uncommissed=TRUE。不过,我宁愿不这样做。

是否可以让读取线程简单地等待锁定而不是立即失败?


import sqlite3
import threading

def get_conn(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None):
    uri = 'file:%s' % name
    if is_memory:
        uri = uri + '?mode=memory&cache=shared'
    conn = sqlite3.connect(uri, uri=True, timeout=timeout, isolation_level=isolation_level)
    if pragmas is None:
        pragmas = []
    if not isinstance(pragmas, list):
        pragmas = [pragmas]
    for pragma in pragmas:
        conn.execute(pragma)
    return conn


def work1(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    for i in range(loops):
        conn.execute('INSERT INTO foo VALUES (1)')


def work2(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    for i in range(loops):
       len(conn.execute('SELECT * FROM foo').fetchall())


def main(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1, num_threads=16):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    try:
        conn.execute('CREATE TABLE foo(a int)')
    except sqlite3.OperationalError:
        conn.execute('DROP TABLE foo')
        conn.execute('CREATE TABLE foo(a int)')
    threads = []
    for i in range(num_threads):
        threads.append(threading.Thread(target=work1, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
        threads.append(threading.Thread(target=work2, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

# In-Memory Tests
# All of these fail immediately with table is locked. There is no delay; timeout/busy_timeout has no effect.
main('a', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('b', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('c', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('d', is_memory=True, timeout=5, isolation_level=None, pragmas=None)
main('e', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('f', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('g', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('h', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('i', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('j', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('k', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
# This is the only successful operation, when isolation_level = None and PRAGMA read_uncommitted=TRUE
main('l', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These start to take a really long time
main('m', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('n', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)

# None of the on disk DB's ever fail:
main('o', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('p', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('q', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('r', is_memory=False, timeout=5, isolation_level=None, pragmas=None)
main('s', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('t', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('u', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('v', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('w', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('x', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('y', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('z', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These actually fail with database is locked
main('aa', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('ab', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)

最佳答案

我不认为 SQLite3 接口(interface)是可重入的。我认为每个线程都必须获取互斥体,执行查询,然后释放互斥体。尝试一次仅执行一项数据库操作。 (Python 的 API 层不会执行此操作,因为通常不需要任何此类操作。)

关于python - Sqlite3 内存中多线程问题 - 阻塞的线程立即失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53769548/

相关文章:

python - Pandas 将系列分配给多索引的新列

python - vim 作为 python ide

javascript - 将表示为 numpy 数组的音频数据从 python 发送到 Javascript

html - 谁能告诉我如何在 Python 上提取和显示图像中的文本

java - 在参数范围内的线程之间分配工作

遍历目录时的 C 编程段错误

c# - 奇怪的多线程索引越界问题

python - 使用对象检测时出现 Open cv 错误

python - Microsoft Azure 授权过程中出现错误如何处理?

arrays - 解析 JSON 来做数学题?