Python - 多处理的奇怪行为 - 连接不执行

标签 python queue multiprocessing dataframe python-multiprocessing

我正在使用 multiprocessing python 模块。我有大约 20-25 个任务要同时运行。每个任务将创建一个约 20k 行的 pandas.DataFrame 对象。问题是,所有任务都执行得很好,但是当涉及到“加入”流程时,它就停止了。我试过使用“小型”DataFrames,效果很好。为了说明我的观点,我创建了以下代码。

import pandas
import multiprocessing as mp

def task(arg, queue):
    DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
    queue.put(DF)
    print("DF %d stored" %arg)

listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]

for p in processes:
    p.start()

for i,p in enumerate(processes):
    print("joining %d" %i)
    p.join()

results = [queue.get() for p in processes]

编辑:

使用 DF = pandas.DataFrame({"hello":range(10)}) 我的一切都是正确的:“DF 0 stored”到“DF 19 stored”,与“joining”相同0”到“加入 19”。

但是 DF = pandas.DataFrame({"hello":range(1000)}) 出现了问题:在存储 DF 时,连接步骤在“连接 3”后停止。

感谢您提供有用的提示:)

最佳答案

这个问题在文档中有解释,在 Pipes and Queues 下:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

使用管理器会奏效,但有很多更简单的方法可以解决这个问题:

  1. 先从队列中读取数据,然后加入进程,而不是相反。
  2. 手动管理队列(例如,使用JoinableQueuetask_done)。
  3. 只需使用 Pool.map 而不是重新发明轮子。 (是的,Pool 所做的很多事情对于您的用例来说并不是必需的——但它也不会妨碍您,而且好处是,您已经知道它可以工作。)

我不会展示 #1 的实现,因为它太琐碎了,或者 #2 的实现,因为它太痛苦了,但是 #3:

def task(arg):
    DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
    return DF

with mp.Pool(processes=20) as p:
    results = p.map(task, range(20), chunksize=1)

(在2.7中,Pool可能无法在with语句中工作;您可以将更高版本的multiprocessing安装回2.7 关闭 PyPI,或者你可以手动创建池,然后在 try/finally关闭它,如果你会处理一个文件它在 with 语句中不起作用...)


您可能会问自己,为什么此时它会失败,但要使用更小的数字——甚至只是小一点点?

那个 DataFrame 的 pickle 刚好超过 16K。 (这个列表本身有点小,但如果你用 10000 而不是 1000 来尝试,你应该在没有 Pandas 的情况下看到同样的结果。)

因此,第一个 child 写入 16K,然后阻塞,直到有空间写入最后几百个字节。但是在 join 之前,您不会从管道中取出任何东西(通过调用 queue.get),并且您不能join 直到他们退出,直到你疏通管道他们才能退出,所以这是一个典型的僵局。有足够的空间让前 4 个通过,但没有空间让 5 个通过。因为你有 4 个核心,所以大多数时候,前 4 个通过的将是前 4 个。但偶尔#4 会击败#3 或其他什么,然后你将无法加入#3。对于 8 核机器,这种情况会更频繁地发生。

关于Python - 多处理的奇怪行为 - 连接不执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29810041/

相关文章:

c++ - 图形中的DFS需要多少时间间隔?

java - 多线程和队列

python - 在不计算整个数组的排名的情况下确定现有 numpy 数组中新元素的排名的最快方法是什么?

javascript - 如何在 Pybossa 中创建文本注释模板?

远程服务器上的 Python 进程需要通过 ssh 连接到其他服务器并继续运行

python - 在Python中重载优先级队列的比较器

Python 'subprocess' CalledProcessError : Command '[...]' returned non-zero exit status 1

Python 多处理模块 : calling an instance method within a process

比较两个远程线程?

Python 使用多处理