Python多处理池一些进程在 fork 时处于死锁状态,但在生成时运行

标签 python multithreading multiprocessing fork deadlock

所以我尝试尝试一些服务下载和调整图像大小(使用线程下载图像和进程来调整它们的大小)。我启动下载线程(使用将监视它们的管理器线程),一旦图像保存在本地,我就会将其路径添加到队列中。下载所有图像后,管理器线程将向队列添加毒丸。
主线程同时监视队列并在下载路径时从中获取路径,并从池中启动一个新的异步进程以调整图像大小。
最后,当我尝试加入游泳池时,它有时会挂起,似乎是一个僵局。它不会每次都发生,但 IMG_URLS 列表中的 url 越多,它发生的频率就越高。如果发生这种死锁,日志会告诉我们某些进程没有正确启动或立即处于死锁状态,因为“resize {file}”日志没有为它们出现。

import logging
import multiprocessing as mp
import time
from queue import Queue
from threading import Thread


def resize_image(file):
    logging.info(f"resizing {file}")
    time.sleep(0.1)
    logging.info(f"done resizing {file}")


class Service(object):
    def __init__(self):
        self.img_queue = Queue()

    def download_image(self, url) -> None:
        logging.info(f"downloading image from URL {url}")
        time.sleep(1)
        file = f"local-{url}"
        self.img_queue.put(file)
        logging.info(f"image saved to {file}")

    def download_images(self, img_url_list: list):
        logging.info("beginning image downloads")

        threads = []
        for url in img_url_list:
            t = Thread(target=self.download_image, args=(url,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()
        logging.info("all images downloaded")
        self.img_queue.put(None)

    def resize_images(self):
        logging.info("beginning image resizing")
        with mp.Pool() as p:
            while True:
                file = self.img_queue.get()
                if file is None:
                    logging.info("got SENTINEL")
                    break
                logging.info(f"got {file}")
                p.apply_async(func=resize_image, args=(file,))
            p.close()
            p.join()
        logging.info("all images resized")

    def run(self, img_url_list):
        logging.info("START service")

        dl_manager_thread = Thread(target=self.download_images, args=(img_url_list,))
        dl_manager_thread.start()
        self.resize_images()

        logging.info(f"END service")


if __name__ == "__main__":
    FORMAT = "[%(threadName)s, %(asctime)s, %(levelname)s] %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=FORMAT)

    IMG_URLS = list(range(8))

    service = Service()
    service.run(IMG_URLS)
使用 python 3.8.5(Ubuntu 20.04,Ryzen 2600)运行它时。我得到以下信息:
[MainThread, 2020-11-30 19:58:01,257, INFO] START service
[Thread-1, 2020-11-30 19:58:01,257, INFO] beginning image downloads
[MainThread, 2020-11-30 19:58:01,257, INFO] beginning image resizing
[Thread-2, 2020-11-30 19:58:01,258, INFO] downloading image from URL 0
[Thread-3, 2020-11-30 19:58:01,258, INFO] downloading image from URL 1
[Thread-4, 2020-11-30 19:58:01,258, INFO] downloading image from URL 2
[Thread-5, 2020-11-30 19:58:01,259, INFO] downloading image from URL 3
[Thread-6, 2020-11-30 19:58:01,260, INFO] downloading image from URL 4
[Thread-7, 2020-11-30 19:58:01,260, INFO] downloading image from URL 5
[Thread-8, 2020-11-30 19:58:01,261, INFO] downloading image from URL 6
[Thread-9, 2020-11-30 19:58:01,262, INFO] downloading image from URL 7
[Thread-2, 2020-11-30 19:58:02,259, INFO] image saved to local-0
[MainThread, 2020-11-30 19:58:02,260, INFO] got local-0
[Thread-3, 2020-11-30 19:58:02,260, INFO] image saved to local-1
[Thread-4, 2020-11-30 19:58:02,260, INFO] image saved to local-2
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-1
[MainThread, 2020-11-30 19:58:02,261, INFO] resizing local-0
[Thread-5, 2020-11-30 19:58:02,261, INFO] image saved to local-3
[Thread-6, 2020-11-30 19:58:02,261, INFO] image saved to local-4
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-2
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-3
[MainThread, 2020-11-30 19:58:02,262, INFO] resizing local-1
[Thread-7, 2020-11-30 19:58:02,262, INFO] image saved to local-5
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-3
[Thread-8, 2020-11-30 19:58:02,263, INFO] image saved to local-6
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-6
[MainThread, 2020-11-30 19:58:02,264, INFO] resizing local-6
[Thread-9, 2020-11-30 19:58:02,264, INFO] image saved to local-7
[MainThread, 2020-11-30 19:58:02,265, INFO] got local-7
[Thread-1, 2020-11-30 19:58:02,265, INFO] all images downloaded
[MainThread, 2020-11-30 19:58:02,265, INFO] got SENTINEL
[MainThread, 2020-11-30 19:58:02,265, INFO] resizing local-7
[MainThread, 2020-11-30 19:58:02,362, INFO] done resizing local-0
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-1
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-3
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-4
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-5
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-6
[MainThread, 2020-11-30 19:58:02,366, INFO] done resizing local-7
有时在这里它开始挂起。请注意 调整 local-2 日志丢失,因此该进程未启动或等待某些内容。
如果我将池更改为使用产卵而不是 fork ,它可以正常工作。我猜在某些情况下,fork 会复制一些处于获取状态的锁,这就是问题所在,但我不清楚在哪里以及为什么。
with mp.get_context("spawn").Pool() as p:
任何的想法?

最佳答案

有时(当你不走运时)当你的池正在旋转时,当你的下载线程正在向 logging 写入消息时,其中一个子进程将被“ fork ”。模块。 logging模块使用受锁保护的队列来传递消息,因此当“ fork ”发生时,该锁可以在锁定状态下被复制。然后,当下载线程完成将其消息写入队列时,只有主进程上的锁被释放,因此您留下一个子进程等待该锁的副本向 logging 写入消息.该锁永远不会被释放,因为下载器线程不会被复制(fork 不会复制线程)。这就是发生的死锁。这种类型的错误可以通过某些方式进行修补,但这是“spawn”存在的原因之一。
此外,“spawn”是所有架构都支持的唯一方法。在没有意识到的情况下使用恰好是多线程的库非常容易,而且“fork”并不是真正的多线程友好。如果您确实需要“fork”提供的减少开销,我对“forkserver”知之甚少。从理论上讲,它是多线程安全的。

fork

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.


Here's关于这个问题的一些引用资料进行了更深入的讨论,我将其用作我的主要资源

关于Python多处理池一些进程在 fork 时处于死锁状态,但在生成时运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65080123/

相关文章:

python - 如何在进程之间共享日期变量 - Multiprocessing python

Python Pickle 不保存整个对象

java - java中线程通信的锁与条件

Python多线程两行代码

c# - ISynchronizeInvoke vs SynchronizationContext vs mainForm.Invoke

python - 将接受类成员函数作为变量的函数传递给python multiprocess pool.map()

java - 基本tomcat网站: threads or processes?

python - 在波浪上旋转矩形

python - 检查两个给定列表中的一个是否是另一个的循环排列

python - 将数据框字典写入excel文件会产生错误