mongodb - 如何使用 insert_many 方法处理 pymongo AutoReconnect 异常

标签 mongodb pymongo bulkinsert replicaset

我有一个包含 3 个成员的 MongoDB 副本集和一个在其中存储数据的 Python 应用程序。

当使用带有包装器的单个文档插入时,我可以处理 pymongo 的 AutoReconnect 异常,如下所示:

def safe_mongo_call(method, num_retries, *args, **kwargs):
    while True:
        try:
            return method(*args, **kwargs)
        except (pymongo.errors.AutoReconnect,
                pymongo.errors.ServerSelectionTimeoutError) as e:
            if num_retries > 0:
                logger.debug('Retrying MongoDB operation: %s', str(e))
                num_retries -= 1
            else:
                raise

我不确定在使用批量写入时如何处理这些异常,例如insert_many 方法。根据documentation ,批量写入不是原子的,因此即使发生其中一个异常,也可能已经有一些文档成功写入数据库。因此,我不能像上面那样简单地重用包装器方法。

处理这些情况的最佳方法是什么?

最佳答案

对于这种情况,BulkWriteError 必须提供已完成操作的详细信息 https://api.mongodb.com/python/current/examples/bulk.html#ordered-bulk-write-operations

但是如果连接丢失,则会发送自动重新连接,并且操作进度信息似乎会丢失(针对 pymongo==3.5.1 进行了测试)

无论如何,您都需要重建已写入的内容和未写入的内容,并对其余项目重试该操作。 在后一种情况下,会有点困难,因为您没有关于实际编写的内容的事先信息,但仍然可行

作为草图解决方案: 每个要插入的文档都会分配一个ObjectId,除非_id已经存在。您可以自己处理这个问题 - 迭代文档,为丢失的文档手动分配 _id 并将 ID 保存在临时变量中。一旦遇到异常,就会找到最后一个成功插入的 _id,利用类似于二分搜索的方法,最多可以进行 ~O(logN) 查询,并且还可以使用批量操作被拆分为较小批处理的事实(https://api.mongodb.com/python/current/examples/bulk.html#bulk-insert)。但当然,这种方法的适用性取决于 mongod 实例上的负载配置文件以及是否允许额外的查询突发。如果按预期抛出 BulkWriteError,您可以只抓取未插入的文档,然后仅对这些文档重试该操作。

回到自动重新连接问题,我个人会在mongo-python-driver问题跟踪器中开一张票,很可能是一个错误或像这样完成故意的

关于mongodb - 如何使用 insert_many 方法处理 pymongo AutoReconnect 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44450575/

相关文章:

node.js - Mongoose bulkWrite - 'q' 的错误类型

mysql - 在 MySQL 中的一列中插入字符串

node.js - Mongoose :不允许更新特定字段

javascript - MongoDB:如何查询名为 "version"的集合

mongodb - 在 MongoDb 中查询小于 NOW 的日期时间值

python - pymongo.errors.CursorNotFound : cursor id '…' not found at server

linux - 类型错误 : __init__() got an unexpected keyword argument 'timeout' pymongo

javascript - 使用express查找单个mongodb文档

python - PyMongo,处理相对于数据库时间的时间字段

python - 如何批量插入或创建多个数据