python - 'DupFd' 中的属性错误 'multiprocessing.resource_sharer' | Python 多处理 + 线程

标签 python multithreading concurrency multiprocessing python-multiprocessing

我正在尝试在多个 threading.Thread 之间进行通信(s) 执行 I/O 密集型任务和多个 multiprocessing.Process (es) 执行 CPU 密集型任务。每当一个线程找到一个进程的工作时,它就会被放在 multiprocessing.Queue 上。 ,连同 multiprocessing.Pipe(duplex=False) 的发送端。然后,进程完成自己的部分,并通过管道将结果发送回线程。此过程似乎在大约 70% 的情况下有效,另外 30% 我收到 AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>

重现:

import multiprocessing
import threading
import time

def thread_work(work_queue, pipe):
    while True:
        work_queue.put((threading.current_thread().name,  pipe[1]))
        received = pipe[0].recv()
        print("{}: {}".format(threading.current_thread().name, threading.current_thread().name == received))
        time.sleep(0.3)

def process_work(work_queue):
    while True:
        thread, pipe = work_queue.get()
        pipe.send(thread)

work_queue = multiprocessing.Queue()
for i in range(0,3):
    receive, send = multiprocessing.Pipe(duplex=False)
    t = threading.Thread(target=thread_work, args=[work_queue, (receive, send)])
    t.daemon = True
    t.start()

for i in range(0,2):
    p = multiprocessing.Process(target=process_work, args=[work_queue])
    p.daemon = True
    p.start()

time.sleep(5)

我查看了多处理 source code ,但无法理解为什么会出现此错误。 我尝试使用 queue.Queue ,或带有 duplex=True 的管道(默认)但无法在错误中找到模式。有谁知道如何调试这个吗?

最佳答案

您正在此处 fork 一个已经是多线程的主进程。众所周知,这通常是有问题的。

It is in-fact problem prone (and not just in Python). The rule is "thread after you fork, not before". Otherwise, the locks used by the thread executor will get duplicated across processes. If one of those processes dies while it has the lock, all of the other processes using that lock will deadlock -Raymond Hettinger.

您收到的错误的触发器显然是子进程中管道的文件描述符的复制失败。

要解决此问题,只要主进程仍然是单线程就创建子进程,或者使用另一个 start_method 创建新进程,例如“spawn”(Windows 上默认)或“forkserver”(如果可用)。

forkserver

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes. docs

您可以指定另一个 start_method:

multiprocessing.set_start_method(method) Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.

Note that this should be called at most once, and it should be protected inside the if name == 'main' clause of the main module. docs

有关特定 start_methods 的基准(在 Ubuntu 18.04 上),请查看 here .

关于python - 'DupFd' 中的属性错误 'multiprocessing.resource_sharer' | Python 多处理 + 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52247576/

相关文章:

python - Pandas 'DataFrame' 对象没有属性 'unique'

python - 使用 Selenium 进行测试时如何调整 Chrome 和 Firefox 中的窗口大小?

c# - 无法从其他线程使用 DependencyObject

c# - Ado.net 从多线程调用存储过程比单线程慢

并发和 slice 迭代

php 和 ruby​​ 脚本访问相同的数据库和表会导致损坏吗?

Python 类继承和 __dict__ 查找

python - 确定 Django 模板中元素的高度

c++ - 何时在 C++ 中使用多线程?

java - 有没有一种简单的方法可以告诉 Java Executor 在给定时刻正在运行哪些任务?