无限迭代器上的Python线程/进程池?

标签 python multithreading threadpool python-multiprocessing

我有一个迭代器函数,可以产生无限的整数流:

def all_ints(start=0):
  yield start
  yield all_ints(start+1)

我希望有一个线程或进程池一次对这些线程或进程进行计算,最多可达 $POOLSIZE。每个进程可能会将结果保存到某些共享数据结构中,因此我不需要进程/线程函数的返回值。在我看来,使用 python3 Pool 可以实现这一点:

# dummy example functions
def check_prime(n):
  return n % 2 == 0

def store_prime(p):
    ''' synchronize, write to some shared structure'''
    pass

p = Pool()

for n in all_ints():
    p.apply_async(check_prime, (n,), callback=store_prime)

但是当我运行这个时,我得到一个 python 进程,它不断地使用更多内存(而不是来自迭代器,它可以运行几天)。如果我存储所有 apply_async 调用的结果,我会期望出现这种行为,但我没有。

我在这里做错了什么?或者我应该使用线程池中的另一个 API?

最佳答案

我认为您正在寻找 Pool.imap_unordered ,它使用池化进程将函数应用于迭代器生成的元素。它的参数chunksize允许您指定在每个步骤中将迭代器中的多少项传递到池中。

此外,我会避免对 IPC 使用任何共享内存结构。只需让发送到池中的“昂贵”函数返回您需要的信息,并在主进程中对其进行处理即可。

这是一个示例(我在 200'000 个结果后中止;如果删除该部分,您将看到进程“永远”在固定数量的 RAM 中愉快地工作):

from multiprocessing import Pool
from math import sqrt
import itertools
import time

def check_prime(n): 
    if n == 2: return (n, True)
    if n % 2 == 0 or n < 2: return (n, False)
    for i in range(3, int(sqrt(n))+1, 2):
        if n % i == 0: return (n, False)
    return (n, True)    

def main():
    L = 200000   # limit for performance timing 
    p = Pool()
    n_primes = 0
    before = time.time()
    for (n, is_prime) in p.imap_unordered(check_prime, itertools.count(1), 1000):
        if is_prime:
            n_primes += 1
            if n_primes >= L: 
                break
    print("Computed %d primes in %.1fms" % (n_primes, (time.time()-before)*1000.0))
if __name__ == "__main__":
    main()

我的 Intel Core i5(2 核,4 线程)上的输出:

Computed 200000 primes in 15167.9ms

如果我将其更改为 Pool(1),则输出,因此仅使用 1 个子进程:

Computed 200000 primes in 37909.2ms

呵呵!

关于无限迭代器上的Python线程/进程池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41786542/

相关文章:

c# - 在 C# 中的循环中使用 ThreadPool

c# - Thread.Sleep 会影响 ThreadPool 吗?

Python PIL - 不透明度 > 0 的所有 PNG 区域都将其不透明度设置为 1

python - 在 Tkinter Entry 上绑定(bind)多个事件?

python - UTF-8编码、字典查找

python - django - 这条线在这里实现了什么?

java - Java 中的可伸缩多线程 String 对象

java - 如果发生异常则退出 Java Callable

c++ - 线程安全的 C++ 堆栈

c++ - 终止 PPL 线程池中的线程