python - Python 中 ProcessPoolExecutor 的运行调用次数不正确

标签 python concurrency future process-pool

在 Python 的 concurrent.futures 标准模块中,为什么 ProcessPoolExecutor 中的运行调用数是 max_workers + 1 而不是 max_workers 就像在 ThreadPoolExecutor 中一样?只有当提交的调用数量严格大于池工作进程的数量时才会发生这种情况。

以下 Python 代码片段向 ProcessPoolExecutor 中的 2 个 worker 提交了 8 次调用:

import concurrent.futures
import time


def call():
    while True:
        time.sleep(1)


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(call) for _ in range(8)]
        time.sleep(5)

        for future in futures:
            print(future.running())

打印这个(3 个运行中的调用;意外的,因为有 2 个工作人员):

True
True
True
False
False
False
False
False

在使用 ThreadPoolExecutor 时打印此(2 个正在运行的调用;预期):

True
True
False
False
False
False
False
False

最佳答案

好吧,我不会太相信这个 running() 方法。似乎并不能真正反射(reflect)实际的运行状态。

确保进程状态的最好方法是让它们打印/更新一些东西。我选择使用 multiprocessing.Manager().dict() 对象创建共享字典。

这个进程同步对象可以从任何进程安全地查询/更新,并且具有共享状态,即使在多进程环境中也是如此。

每次启动进程时,使用 PID 作为键并使用 True 作为值更新共享字典。退出时设置 False

import concurrent.futures
import multiprocessing
import time,os


def call(shared_dict):
    shared_dict[os.getpid()] = True
    print("start",shared_dict)
    time.sleep(10)
    shared_dict[os.getpid()] = False
    print("end",shared_dict)


if __name__ == "__main__":

    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        shared_dict = multiprocessing.Manager().dict()
        futures = [executor.submit(call,shared_dict) for _ in range(8)]
        time.sleep(5)
        for future in futures:
            print(future.running())

这是我得到的输出:

start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}

如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明

  • 同时运行的进程不超过 2 个
  • 进程在开始时只创建一次,然后重新用于执行进一步的调用(毕竟它是一个池)

让我们检查一下极简主义 documentation :

running() Return True if the call is currently being executed and cannot be cancelled.

它似乎反射(reflect)了一种与取消 Future 对象 future 执行的可能性相关的状态(因为它还没有正确初始化/连接到通信队列,现在仍然是取消它的时候) 而不是进程本身的实际“运行”状态。

这可能就是 source code 中的这条评论表示下面的 set_running_or_notify_cancel 定义:

Mark the future as running or process any cancel notifications.

If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.

If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.

我们再次了解到,最好让子流程协作、发布它们的状态,而不是试图使用未明确记录的方法来勒索它。

关于python - Python 中 ProcessPoolExecutor 的运行调用次数不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56587166/

相关文章:

python - 根据 Python 的 pandas DataFrame 类,什么是大小可变的?

python 2.7 : How to extend a class from an imported base class

python - 如何在python中将网页转换为pdf,就像打印中的另存为pdf选项一样

python - 扭曲或 celery ?哪个适合我的具有大量 SOAP 调用的应用程序?

java - 通过拆分和运行将 ListenableFuture<Iterable<A>> 转换为 Iterable<ListenableFuture<B>>

python - 有没有办法更改函数外部定义的变量或函数

multithreading - 在顺序执行的线程中使用ArrayBuffer?

concurrency - 使select语句同时等待多个 channel

stream - 如何通过 io::Write 特征写入来通过 futures Stream 发送数据?

list - Flutter/Dart-有关如何对 future 进行故障排除的建议?