我必须在 MongoDB 上执行大量插入和更新操作。
我正在尝试测试多处理来完成这些任务。为此,我创建了这个简单的代码。我的虚拟数据是:
documents = [{"a number": i} for i in range(1000000)]
没有多处理:
time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)
我有 150 秒。
对于多处理,我根据需要和 Pymongo's FAQs 中的描述定义了以下工作函数.
def insert_doc(document):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_one(document)
但是,当我运行我的代码时:
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
我得到一个错误:
pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 99] Cannot assign requested address
在出现错误之前,总共处理了 26447 个文档。这个错误解释here ,尽管遇到该错误的人没有使用多处理。那里的解决方案是只打开一个 MongoClient,但是当我想进行多处理时这是不可能的。有什么解决方法吗?感谢您的帮助。
最佳答案
您的代码为示例中的每百万个文档创建了一个新的 MongoClient(就像您链接到的问题一样)。这要求您为每个新查询打开一个新套接字。这会破坏 PyMongo 的连接池,除了速度极慢之外,它还意味着您打开和关闭套接字的速度快于您的 TCP 堆栈可以跟上的速度:您将太多套接字留在 TIME_WAIT 状态,因此您最终会耗尽端口。
如果您向每个客户端插入大量文档,则可以创建更少的客户端,因此打开更少的套接字:
import multiprocessing as mp
import time
from pymongo import MongoClient
documents = [{"a number": i} for i in range(1000000)]
def insert_doc(chunk):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_many(chunk)
chunk_size = 10000
def chunks(sequence):
# Chunks of 1000 documents at a time.
for j in range(0, len(sequence), chunk_size):
yield sequence[j:j + chunk_size]
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
关于python - Pymongo 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41104582/