我很难理解为什么这个简单的程序会引发 EOFError
在最后。
我正在使用 Queue()
与 Thread()
沟通我想自动干净地终止 atexit
我的程序。
import threading
import multiprocessing
import atexit
class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
# Remove this: no error
self.queue.put("message")
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", msg)
if msg is None:
break
def stop(self):
self.queue.put(None)
self.thread.join()
instance = MyClass()
atexit.register(instance.stop)
# Put this before register: no error
instance.start()
这引发了:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 21, in queued_writer
msg = self.queue.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
此外,此代码段的行为很奇怪:如果我删除 self.queue.put("message")
行,不会引发任何错误并且线程会成功退出。同样,如果在 atexit.register()
之前调用 instance.start()
,这似乎也有效。
有谁知道错误从何而来?
编辑:我注意到使用 SimpleQueue()
似乎使错误消失。
最佳答案
问题来自多个 atexit.register()
之间的冲突电话。
文档指出:
atexit
runs these functions in the reverse order in which they were registered; if you registerA
,B
, andC
, at interpreter termination time they will be run in the orderC
,B
,A
.[...]
The assumption is that lower level modules will normally be imported before higher level modules and thus must be cleaned up later.
通过首先导入 multiprocessing
然后调用 atexit.register(my_stop)
,你会期望你的停止函数在任何内部终止过程之前执行......但是这个情况并非如此,因为 atexit.register()
可能会被动态调用。
在当前情况下,multiprocessing
库使用 _exit_function
旨在干净地关闭内部线程和队列的函数。此函数在 atexit
中注册 at the module level , 但是模块只加载了 once the Queue()
object is initialized .
因此,MyClass
停止函数在 multiprocessing
的函数 之前被注册,因此 instance.stop
在 _exit_function
之后调用。
在终止期间,_exit_function
关闭内部管道连接,因此如果线程稍后尝试使用关闭的读取连接调用 .get()
, EOFError
被引发。只有当 Python 没有时间在最后自动杀死 daemon
线程时才会发生这种情况,也就是说,如果一个“慢”退出函数(如 time.sleep(0.1)
或者在这种情况下 thread.join()
) 在通常的关闭过程之后注册并运行。由于某种原因,写连接关闭被延迟,因此 .put()
不会立即引发错误。
至于为什么对代码片段进行小的修改使其工作:SimpleQueue
没有 Finalizer
所以内部管道稍后关闭。 Queue
的内部线程在第一个 .put()
被调用之前不会启动,因此删除它意味着没有管道可以关闭。也可以通过导入 multiprocessing.queues
来强制注册。
关于python - "EOF error"在程序退出时使用多处理队列和线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49209385/