在 Python 的 concurrent.futures
标准模块中,为什么 ProcessPoolExecutor
中的运行调用数是 max_workers + 1
而不是 max_workers
就像在 ThreadPoolExecutor
中一样?只有当提交的调用数量严格大于池工作进程的数量时才会发生这种情况。
以下 Python 代码片段向 ProcessPoolExecutor
中的 2 个 worker 提交了 8 次调用:
import concurrent.futures
import time
def call():
while True:
time.sleep(1)
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(call) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())
打印这个(3 个运行中的调用;意外的,因为有 2 个工作人员):
True
True
True
False
False
False
False
False
在使用 ThreadPoolExecutor
时打印此(2 个正在运行的调用;预期):
True
True
False
False
False
False
False
False
最佳答案
好吧,我不会太相信这个 running()
方法。似乎并不能真正反射(reflect)实际的运行状态。
确保进程状态的最好方法是让它们打印/更新一些东西。我选择使用 multiprocessing.Manager().dict()
对象创建共享字典。
这个进程同步对象可以从任何进程安全地查询/更新,并且具有共享状态,即使在多进程环境中也是如此。
每次启动进程时,使用 PID 作为键并使用 True
作为值更新共享字典。退出时设置 False
。
import concurrent.futures
import multiprocessing
import time,os
def call(shared_dict):
shared_dict[os.getpid()] = True
print("start",shared_dict)
time.sleep(10)
shared_dict[os.getpid()] = False
print("end",shared_dict)
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
shared_dict = multiprocessing.Manager().dict()
futures = [executor.submit(call,shared_dict) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())
这是我得到的输出:
start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}
如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明
- 同时运行的进程不超过 2 个
- 进程在开始时只创建一次,然后重新用于执行进一步的调用(毕竟它是一个池)
让我们检查一下极简主义 documentation :
running() Return True if the call is currently being executed and cannot be cancelled.
它似乎反射(reflect)了一种与取消 Future
对象 future 执行的可能性相关的状态(因为它还没有正确初始化/连接到通信队列,现在仍然是取消它的时候) 而不是进程本身的实际“运行”状态。
这可能就是 source code 中的这条评论表示下面的 set_running_or_notify_cancel
定义:
Mark the future as running or process any cancel notifications.
If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.
我们再次了解到,最好让子流程协作、发布它们的状态,而不是试图使用未明确记录的方法来勒索它。
关于python - Python 中 ProcessPoolExecutor 的运行调用次数不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56587166/