我有一个迭代器函数,可以产生无限的整数流:
def all_ints(start=0):
yield start
yield all_ints(start+1)
我希望有一个线程或进程池一次对这些线程或进程进行计算,最多可达 $POOLSIZE。每个进程可能会将结果保存到某些共享数据结构中,因此我不需要进程/线程函数的返回值。在我看来,使用 python3 Pool 可以实现这一点:
# dummy example functions
def check_prime(n):
return n % 2 == 0
def store_prime(p):
''' synchronize, write to some shared structure'''
pass
p = Pool()
for n in all_ints():
p.apply_async(check_prime, (n,), callback=store_prime)
但是当我运行这个时,我得到一个 python 进程,它不断地使用更多内存(而不是来自迭代器,它可以运行几天)。如果我存储所有 apply_async 调用的结果,我会期望出现这种行为,但我没有。
我在这里做错了什么?或者我应该使用线程池中的另一个 API?
最佳答案
我认为您正在寻找 Pool.imap_unordered ,它使用池化进程将函数应用于迭代器生成的元素。它的参数chunksize
允许您指定在每个步骤中将迭代器中的多少项传递到池中。
此外,我会避免对 IPC 使用任何共享内存结构。只需让发送到池中的“昂贵”函数返回您需要的信息,并在主进程中对其进行处理即可。
这是一个示例(我在 200'000 个结果后中止;如果删除该部分,您将看到进程“永远”在固定数量的 RAM 中愉快地工作):
from multiprocessing import Pool
from math import sqrt
import itertools
import time
def check_prime(n):
if n == 2: return (n, True)
if n % 2 == 0 or n < 2: return (n, False)
for i in range(3, int(sqrt(n))+1, 2):
if n % i == 0: return (n, False)
return (n, True)
def main():
L = 200000 # limit for performance timing
p = Pool()
n_primes = 0
before = time.time()
for (n, is_prime) in p.imap_unordered(check_prime, itertools.count(1), 1000):
if is_prime:
n_primes += 1
if n_primes >= L:
break
print("Computed %d primes in %.1fms" % (n_primes, (time.time()-before)*1000.0))
if __name__ == "__main__":
main()
我的 Intel Core i5(2 核,4 线程)上的输出:
Computed 200000 primes in 15167.9ms
如果我将其更改为 Pool(1)
,则输出,因此仅使用 1 个子进程:
Computed 200000 primes in 37909.2ms
呵呵!
关于无限迭代器上的Python线程/进程池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41786542/