Python 3.6 multiprocessing.Pool 间歇性无法在 KeyboardInturrupt 上退出

标签 python multithreading python-3.x multiprocessing

我正在尝试使用 multiprocessing.Pool将一些作业异步分派(dispatch)给外部进程,例如:

#!/bin/env python3
'''
    test.py
'''
import multiprocessing.util
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE

multiprocessing.util.log_to_stderr(multiprocessing.util.DEBUG)

def waiter(arg):
    cmd = "sleep 360"
    cmd_arg = shlex.split(cmd)
    p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE) 
    so, se = p.communicate()
    print (f"{so}\n{se}")
    return arg

def main1():
    proc_pool = Pool(4)
    it = proc_pool.imap_unordered(waiter, range(0, 4))
    for r in it:
        print (r)

if __name__ == '__main__':
    main1()

我希望它在 SIGINT 上终止所有调用的子进程、池工作程序及其自身.目前,这适用于 4 的池大小。 :
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140194873397248
[DEBUG/MainProcess] created semlock with handle 140194873393152
[DEBUG/MainProcess] created semlock with handle 140194873389056
[DEBUG/MainProcess] created semlock with handle 140194873384960
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-2] process shutting down
[DEBUG/ForkPoolWorker-2] running all "atexit" [DEBUG/ForkPoolWorker-3] runni[DEBUG/ForkPoolWorker-2] running the remaining[DEBUG/ForkPoolWorker-3] running the remaining "atexit" finalizers
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, en[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] result handler found thread._state=TERMINATE
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=1, thread._state=2
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
= self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-2] process exiting with exitcode 1
Process ForkPoolWorker-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, endtime, timeout)
  File "/usr/local/lib/python3.6/subprocess.py", line 1496, in _communicate
    ready = selector.select(timeout)
  File "/usr/local/lib/python3.6/selectors.py", line 376, in select
    fd_event_list = self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-3] process exiting with exitcode 1
$>

但是随着池大小的增加,它开始间歇性地失败,例如:
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140143972425728
[DEBUG/MainProcess] created semlock with handle 140143972421632
[DEBUG/MainProcess] created semlock with handle 140143972417536
[DEBUG/MainProcess] created semlock with handle 140143972413440
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-12] process shutting down
[DEBUG/ForkPoolWorker-12] running all "atexit"[DEBUG/ForkPoolWorker-8] runnin[DEBUG/ForkPoolWorker-12] running the remaini[DEBUG/ForkPoolWorker-8] running the remaining "atexit" finalizers
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, endProcess ForkPoolWorker-9:
Traceback (most recent call last)[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] task handler exiting
(... Hangs here until I hit Ctrl-C again ...)
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 534, in _terminate_pool
    cls._help_stuff_finish(inqueue, task_handler, len(pool))
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 519, in _help_stuff_finish
    inqueue._rlock.acquire()
KeyboardInterrupt
$>

大多数路径领先here ,但此变通办法所指的错误似乎名义上是 fixed in 3.3+ ,这就是为什么它甚至在某些时候也有效的原因;无论哪种情况,我都尝试了该解决方案,但并没有解决问题。有没有人有修复或进一步调试此问题的建议?

环境:Python 3.6.1 在一个 32 核的 SuSe 盒子上。

最佳答案

所以实际上经过大量挖掘,我尝试了该帖子中建议的其他一些组合,这似乎解决了这个问题。据我所知,如果您允许 SIGINT传递给子进程,就像默认情况下那样,并且池 worker 比作业多得多,池 worker 可能会在不释放锁的情况下被杀死 ._inqueue .然后_help_stuff_finish锁定试图获取一个从未被工作人员释放的锁。

#!/bin/env python3.6
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE
import signal

def waiter(arg):
    cmd = "sleep 360"
    cmd_arg = shlex.split(cmd)
    p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE)
    so, se = p.communicate()
    print (f"{so}\n{se}")
    return arg

########################
# Adapted from: https://stackoverflow.com/a/44869451 
#

proc_pool = None

def int_handler (*arg, **kwargs):
    if proc_pool:
        proc_pool.terminate()
        proc_pool.join()
        exit(1)

def initializer():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

proc_pool = Pool(32, initializer=initializer)

def main1():
    it = proc_pool.imap_unordered(waiter, range(0, 4))
    for r in it:
        print (r)

if __name__ == '__main__':
    main1()

关于Python 3.6 multiprocessing.Pool 间歇性无法在 KeyboardInturrupt 上退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50417746/

相关文章:

android - Android 5位图绘制麻烦

node.js - 启动后分离一个 spawn 子进程

python - 没有在 PyCharm 中添加 WSL python 解释器的菜单

python - 通过 MySQL 连接器 Python 与 CGI 的数据库连接无法正常工作

python - 使用 Python 在 Mac 上打开 .pages 文件

python - 装饰 Python 类方法的最佳方式是什么?

python - Keras 密集输出层形状错误

python - 我不应该用于局部变量的字符串的全面概述?

python - Appengine Python DevServer 模块后台线程 500 错误

python-3.x - 使用 Python 3.2 CType 调用 CreateRemoteThread 时出现 Error_Invalid_Parameter 错误 57