python - "EOF error"在程序退出时使用多处理队列和线程

标签 python multithreading python-3.x queue multiprocessing

我很难理解为什么这个简单的程序会引发 EOFError在最后。

我正在使用 Queue()Thread() 沟通我想自动干净地终止 atexit我的程序。

import threading
import multiprocessing
import atexit

class MyClass:

    def __init__(self):
        self.queue = None
        self.thread = None

    def start(self):
        self.queue = multiprocessing.Queue()
        self.thread = threading.Thread(target=self.queued_writer, daemon=True)
        self.thread.start()

        # Remove this: no error
        self.queue.put("message")

    def queued_writer(self):
        while 1:
            msg = self.queue.get()
            print("Message:", msg)
            if msg is None:
                break

    def stop(self):
        self.queue.put(None)
        self.thread.join()

instance = MyClass()

atexit.register(instance.stop)

# Put this before register: no error
instance.start()

这引发了:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 21, in queued_writer
    msg = self.queue.get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

此外,此代码段的行为很奇怪:如果我删除 self.queue.put("message") 行,不会引发任何错误并且线程会成功退出。同样,如果在 atexit.register() 之前调用 instance.start(),这似乎也有效。

有谁知道错误从何而来?

编辑:我注意到使用 SimpleQueue()似乎使错误消失。

最佳答案

问题来自多个 atexit.register() 之间的冲突电话。

文档指出:

atexit runs these functions in the reverse order in which they were registered; if you register A, B, and C, at interpreter termination time they will be run in the order C, B, A.

[...]

The assumption is that lower level modules will normally be imported before higher level modules and thus must be cleaned up later.

通过首先导入 multiprocessing 然后调用 atexit.register(my_stop),你会期望你的停止函数在任何内部终止过程之前执行......但是这个情况并非如此,因为 atexit.register() 可能会被动态调用。

在当前情况下,multiprocessing 库使用 _exit_function旨在干净地关闭内部线程和队列的函数。此函数在 atexit 中注册 at the module level , 但是模块只加载了 once the Queue() object is initialized .

因此,MyClass 停止函数在 multiprocessing 的函数 之前被注册,因此 instance.stop _exit_function 之后调用。

在终止期间,_exit_function 关闭内部管道连接,因此如果线程稍后尝试使用关闭的读取连接调用 .get() EOFError 被引发。只有当 Python 没有时间在最后自动杀死 daemon 线程时才会发生这种情况,也就是说,如果一个“慢”退出函数(如 time.sleep(0.1)或者在这种情况下 thread.join()) 在通常的关闭过程之后注册并运行。由于某种原因,写连接关闭被延迟,因此 .put() 不会立即引发错误。

至于为什么对代码片段进行小的修改使其工作:SimpleQueue 没有 Finalizer 所以内部管道稍后关闭。 Queue 的内部线程在第一个 .put() 被调用之前不会启动,因此删除它意味着没有管道可以关闭。也可以通过导入 multiprocessing.queues 来强制注册。

关于python - "EOF error"在程序退出时使用多处理队列和线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49209385/

相关文章:

python - Pandas 在多个大型数据帧中找到共同的 NA 记录

java - Java 中的异步 Web 请求?

python - 如何将 Numba "@vectorize"ufunc 与结构化 Numpy 数组一起使用?

Python:os.fork() 是如何工作的?

python - 如何为 Jupyter notebook 中的每个单元格启用计时魔法?

c++ - 您需要加入已取消的话题吗? (线程)

c# - 来自取消 token 的任务?

python - 在 Python 中测试类的正确方法是什么?

python - 通过索引获取字符串的多个字符

python - 抓取中间有 <br> 的表格(不出现)