python - 使用多处理队列、池和锁定的简单示例

标签 python python-2.7 multiprocessing

我尝试阅读 http://docs.python.org/dev/library/multiprocessing.html 上的文档但我仍在为多处理队列、池和锁定而苦苦挣扎。现在我能够构建下面的示例。

关于队列和池,我不确定我是否以正确的方式理解了这个概念,如果我错了,请纠正我。我想要实现的是 一次处理 2 个请求(本例中数据列表有 8 个)那么,我应该使用什么?池来创建可以处理两个不同队列(最多 2 个)的 2 个进程,还是我应该每次只使用 Queue 来处理 2 个输入?锁定将正确打印输出。

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)

最佳答案

解决您的问题的最佳方法是利用 Pool。使用 Queue 并拥有单独的“队列馈送”功能可能是矫枉过正。

这是您的程序的一个稍微重新排列的版本,这一次 只有 2 个进程 位于 Pool 中。我相信这是最简单的方法,对原始代码的改动很小:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

请注意,mp_worker() 函数现在接受一个参数(前两个参数的元组),因为 map() 函数将您的输入数据分 block 到子列表中,每个子列表作为一个参数提供给您的工作函数。

输出:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

根据下面的@Thales 评论进行编辑:

如果您想要“为每个池限制锁定”,以便您的进程以串联对运行,ala:

A 等待 B 等待 | A 完成,B 完成 | C等待,D等待| C 完成,D 完成 | ...

然后将处理函数更改为为每对数据启动池(2 个进程):

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

现在你的输出是:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE

关于python - 使用多处理队列、池和锁定的简单示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20887555/

相关文章:

python - 使用Python找出2个数字的lcm

python - Python的multiprocessing包中,为什么会有multiprocessing.Pool和multiprocessing.pool.Pool?

python-2.7 - pandas 中基于列条件的多重索引

python - python多处理中父进程全局变量如何复制到子进程

python-3.x - 多处理显示 matplotlib 图

python - 使用 Python 脚本创建 MySQLdb 数据库

python - 带有 virtualenv 的 pycharm 远程项目

python - 提交带有 Mechanize HTTP 错误 500 的表单

python - 多个线程需要访问单个资源

python-2.7 - 使用 pyodbc 与多处理并行更新 MSSQL 表时出现太多死锁错误