python - 星图与 tqdm 结合?

标签 python multiprocessing python-multiprocessing tqdm process-pool

我在做一些并行处理,如下:

with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)

输入看起来像: [(1,0.2312),(5,0.52) ...] 即 int 和 float 的元组。

代码运行良好,但我似乎无法将它包裹在加载栏 (tqdm) 周围,例如可以使用 imap 方法完成,如下所示:

tqdm.tqdm(mp.imap(some_function,some_inputs))

这也可以用于星图吗?

谢谢!

最佳答案

使用 starmap() 是不可能的,但是可以通过添加 Pool.istarmap() 的补丁来实现。它基于 imap() 的代码。您所要做的就是创建 istarmap.py 文件并导入模块以应用补丁,然后再进行常规的多处理导入。

python <3.8

# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

然后在你的脚本中:

import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass

关于python - 星图与 tqdm 结合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57354700/

相关文章:

python - 如何避免pyspark中表达式中的多个窗口函数

parallel-processing - MPI 程序的断言函数

python - Tensorflow多处理; UnknownError : Could not start gRPC server

python - 多处理 : optimize CPU usage for concurrent HTTP async requests

python - 子进程挂起多处理

Python 随机 URL 选择

python - 在python中有条件地匹配两个数据库

python - 如何使用 python regex 将正则表达式与其中的一个数字完全匹配?

python - "bucketsort"使用 python 多处理

python multiprocessing .join() 死锁取决于工作函数