Python MongoDB (PyMongo) 多重处理游标

标签 python multithreading mongodb multiprocessing

我正在尝试制作一个多处理 MongoDB 实用程序,它运行良好,但我认为我遇到了性能问题......即使有 20 名 worker ,它每秒处理的文档也不超过 2800 份......我认为我可以快 5 倍...这是我的代码,它没有做任何异常,只是打印到光标末尾的剩余时间。

也许有更好的方法在 MongoDB 游标上执行多处理,因为我需要在每个包含 17.4M 记录集合的文档上运行一些东西,所以性能和更少的时间是必须的。

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(queue):
    for p, i, pcount in iter(queue.get, 'STOP'):
        remaining_time(pcount, i)


def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = products.count()
        i = 1
        print "Procesando %s productos..." % pcount
        for p in products:
            try:
                queue.put((p, i, pcount))
                i += 1
            except Exception, e:
                utils.log(e)
                continue
    queue.put('STOP')


def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]

    for p in procs:
        p.start()

    populate_jobs(queue)

    for p in procs:
        p.join()

另外,我注意到大约每 2500 个 aprox 文档,脚本就会暂停大约 0.5 - 1 秒,这显然是一个糟糕的问题。这是一个 MongoDB 问题,因为如果我执行完全相同的循环但使用 range(0, 1000000) 脚本根本不会暂停并且以每秒 57,000 次迭代的速度运行,总共 20 秒结束脚本...与每秒 2,800 个 MongoDB 文档的巨大差异...

这是运行 1,000,000 次迭代循环的代码,而不是文档。

def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = 1000000
        i = 1
        print "Procesando %s productos..." % pcount
        for p in range(0, 1000000):
            queue.put((p, i, pcount))
            i += 1
    queue.put('STOP')

更新 如我所见,问题不在于多处理本身,是游标填充了未在多处理模式下运行的 Queue ,这是一个填充 Queue 的简单过程( populateJobs 方法)也许如果我可以使游标多线程/multirpocess 并并行填充 Queue 它会填充得更快,然后多处理方法 dowork 会做得更快,因为我认为存在一个瓶颈,我每秒只能在 Queue 中填充大约 2,800 个项目,而在 dowork 多进程中检索更多项目,但我不这样做不知道如何并行化 MongoDB 游标。

也许,问题是我的计算机和服务器的 MongoDB 之间的延迟。在我请求下一个光标和 MongoDB 告诉我哪个是之间的延迟使我的性能降低了 2000%(从 61,000 str/s 到 2,800 doc/s) 我在本地主机 MongoDB 上试过,性能完全一样......这让我抓狂

最佳答案

下面是如何使用 Pool 喂养 child :

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(args):
    p, i, pcount  = args
    remaining_time(pcount, i)

def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    pcount = products.count()
    pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
    pool.close()
    pool.join()

请注意,使用 pool.map 需要立即将游标中的所有内容加载到内存中,但这可能是一个问题,因为它很大。您可以使用 imap 来避免一次消耗全部内容,但您需要指定一个 chunksize 以最小化 IPC 开销:

# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
   chunksize += 1

pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()

对于 1,000,000 个项目, block 大小为 12,500。您可以尝试比这更大或更小的尺寸,看看它如何影响性能。

不过,如果瓶颈实际上只是从 MongoDB 中提取数据,我不确定这会有多大帮助。

关于Python MongoDB (PyMongo) 多重处理游标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29745787/

相关文章:

c++ - 具有不同类型的可变参数函数,传递给线程 cout

python - Dragonfly 的模态命令

python - 教程中发现 TensorFlow 错误

c++ - 这个简单的(原子的)锁线程安全吗?

c++ - 增加线程数,但程序无法更快地运行 C++ OpenMP 选择排序

node.js - 在同一台服务器上设置 MongoDB 和 Redis

mongodb - 从嵌套数组中的 id 匹配的另一个集合中获取所有字段

javascript - Meteor js 光标 hasNext() 和 next()?

python - 如何让系统实现RFID卡二次读取退出?

python - 向法学院学生教授 Python