python - Django:如何在事务中包装批量更新/插入操作?

标签 python sql django transactions django-database

这是我的用例:

  • 我有多个并行运行的 celery 任务
  • 每个任务都可以批量创建更新许多对象。为此,我使用 django-bulk

所以基本上我使用了一个非常方便的函数 insert_or_update_many :

  1. 它首先执行一个选择
  2. 如果找到对象,它会更新它们
  3. 否则它会创建它们

但这会引入并发问题。例如:如果在步骤 1 中某个对象不存在,则将其添加到稍后要插入的对象列表中。但在此期间可能会发生另一个 Celery 任务创建了该对象,当它尝试执行批量插入时(第 3 步),我收到重复条目的错误。

我想我需要将这 3 个步骤包装在一个“阻塞” block 中。 我已经阅读了有关事务的文章,并尝试将步骤 1、2、3 包装在 with transaction.commit_on_success: block

with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

但这对我不起作用。我不确定我是否完全了解这些交易。我基本上需要一个 block ,我可以在其中放置多个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。

最佳答案

I basically need a block where I can put several operations being sure no other process or thread is accessing (in write) my db resources.

Django 事务通常不会为您保证。如果您来自计算机科学的其他领域,您自然会认为事务以这种方式阻塞,但在数据库世界中有不同种类的锁,位于不同的 isolation levels。 ,并且它们因每个数据库而异。因此,为确保您的事务执行此操作,您将必须了解事务、锁及其性能特征,以及数据库提供的用于控制它们的机制。

但是,让一堆进程都试图锁定表以执行竞争插入听起来不是一个好主意。如果碰撞很少见,你可以做一种乐观锁定的形式,如果失败就重试事务。或者,也许您可​​以将所有这些 celery 任务定向到一个进程(如果您无论如何都要获取表锁,那么并行化它没有性能优势)。

我的建议是从忘记批量操作开始,使用 Django 的 update_or_create 一次只做一行。 .只要您的数据库具有防止重复条目的约束(听起来确实如此),这就应该没有您上面描述的竞争条件。如果性能确实变得 Not Acceptable ,则考虑更复杂的选项。

optimistic concurrency方法意味着不是通过获取表锁来防止冲突,而是照常进行,然后在出现问题时重试该操作。在您的情况下,它可能类似于:

while True:
    try:
        with transaction.atomic():
            # do your bulk insert / update operation
    except IntegrityError:
        pass
    else:
        break

因此,如果您遇到竞争条件,所产生的 IntegrityError 将导致 transaction.atomic() block 回滚已进行的任何更改,并且while 循环将强制重试事务(假设批量操作现在将看到新存在的行并将其标记为更新而不是插入)。

这种方法在碰撞很少发生的情况下非常有效,而在碰撞频繁的情况下则非常糟糕。

关于python - Django:如何在事务中包装批量更新/插入操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24331420/

相关文章:

SQL 局部最小值和最大值

python - 如何模拟 Django 模型对象(及其方法)?

python - 两部分 : Why doesn't this code call the function I want, 我可以在不为其创建单独函数的情况下执行此操作吗?

Python Twisted WebSocket 客户端

SQL - 如何从另一个表获取多个值以适合 select 语句中的一行?

Sql - 对某些类别进行过滤

django - 帖子不显示保存在 django 数据库中的内容

python - 全局名称 'ParseError' 未定义,我使用 try 和 except 来避免它,但这仍然显示

python - 使用Python从文本文件中提取多行

python - 选择 Pandas 中某个值周围的行