问题
来自 multiprocessing.Pool
docs :
apply_async(func ...)
: A variant of theapply()
method which returns a result object. ...
进一步阅读......
apply(func[, args[, kwds]])
: Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.
最后一行粗体表示只使用池中的一名 worker 。我发现这仅在特定条件下是正确的。
给定
这是在三个类似情况下执行 Pool.apply_async()
的代码。在所有情况下,都会打印进程 ID。
import os
import time
import multiprocessing as mp
def blocking_func(x, delay=0.1):
"""Return a squared argument after some delay."""
time.sleep(delay) # toggle comment here
return x*x, os.getpid()
def apply_async():
"""Return a list applying func to one process with a callback."""
pool = mp.Pool()
# Case 1: From the docs
results = [pool.apply_async(os.getpid, ()) for _ in range(10)]
results = [res.get(timeout=1) for res in results]
print("Docs :", results)
# Case 2: With delay
results = [pool.apply_async(blocking_func, args=(i,)) for i in range(10)]
results = [res.get(timeout=1)[1] for res in results]
print("Delay :", results)
# Case 3: Without delay
results = [pool.apply_async(blocking_func, args=(i, 0)) for i in range(10)]
results = [res.get(timeout=1)[1] for res in results]
print("No delay:", results)
pool.close()
pool.join()
if __name__ == '__main__':
apply_async()
结果
例子来自 docs (案例 1)确认只有一名 worker 在运行。我们在下一个案例中通过应用 blocking_func
来扩展这个例子,它会延迟一些阻塞。
注释 blocking_func()
中的 time.sleep()
行使所有情况都一致。
# Time commented
# 1. Docs : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 2. Delay : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
每次调用 apply_async()
都会创建一个新的进程池,这就是新进程 ID 与后者不同的原因。
# Time uncommented
# 1. Docs : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780]
# 2. Delay : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
# 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
然而,当 time.sleep()
未被注释时,即使延迟为零,也会使用多个 worker。
简而言之,在未注释的情况下,我们希望在案例 1 中有一个 worker ,但在案例 2 和案例 3 中我们有多个 worker 。
问题
虽然我希望 Pool().apply_async()
只使用一个 worker,但为什么在 time.sleep()
未注释时使用了多个?阻塞甚至会影响 apply
或 apply_async
使用的 worker 数量吗?
注意:之前的相关问题是“为什么只使用一名 worker ?”这个问题问的是相反的问题——“为什么不是只使用一个 worker ?”我在 Windows 机器上使用 2 个内核。
最佳答案
您的困惑似乎来自于认为 [pool.apply_async(...) for i in range(10)]
是一个调用,当真的有十个独立调用时。对任何池方法的调用都是一项“工作”。一项工作通常会导致分配一个或多个任务。 apply
- 方法在幕后总是只产生一个任务。任务是一个不可分割的工作单元,将作为一个整体由随机池工作人员接收。
只有一个共享的inqueue
,所有worker都被fed了。哪个空闲工作人员将从等待 get()
中唤醒,该队列中的任务取决于操作系统。案例 1 的结果熵仍然有些令人惊讶并且可能非常幸运,至少除非您确认您只有两个核心。
是的,您对这次运行的观察也受到任务所需计算时间的影响,因为线程(进程中的计划执行单元)通常使用时间切片策略进行计划(例如,Windows 为 ~20ms)。
关于python - 为什么在 `multiprocessing.Pool().apply_async()` 中使用了不止一名 worker ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54757400/