python - 使用 Python 在 Windows 上实现并发/并行

标签 python windows multiprocessing

我开发了一个简单的程序来解决八皇后问题。现在我想用不同的元参数做更多的测试,所以我想让它更快。我经历了几次分析迭代,能够显着缩短运行时间,但我认为只有部分计算并发才能使其更快。我尝试使用 multiprocessingconcurrent.futures 模块,但它并没有显着改善运行时间,在某些情况下甚至减慢了执行速度。那只是提供一些上下文。

我能够想出类似的代码结构,其中顺序版本胜过并发。

import numpy as np
import concurrent.futures
import math
import time
import multiprocessing

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def generate_data(seed):
    np.random.seed(seed)
    numbers = []
    for _ in range(5000):
        nbr = np.random.randint(50000, 100000)
        numbers.append(nbr)
    return numbers

def run_test_concurrent(numbers):
    print("Concurrent test")
    start_tm = time.time()
    chunk = len(numbers)//3
    primes = None
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as pool:
        primes = list(pool.map(is_prime, numbers, chunksize=chunk))
    print("Time: {:.6f}".format(time.time() - start_tm))
    print("Number of primes: {}\n".format(np.sum(primes)))


def run_test_sequential(numbers):
    print("Sequential test")
    start_tm = time.time()
    primes = [is_prime(nbr) for nbr in numbers]
    print("Time: {:.6f}".format(time.time() - start_tm))
    print("Number of primes: {}\n".format(np.sum(primes)))


def run_test_multiprocessing(numbers):
    print("Multiprocessing test")
    start_tm = time.time()
    chunk = len(numbers)//3
    primes = None
    with multiprocessing.Pool(processes=3) as pool:
        primes = list(pool.map(is_prime, numbers, chunksize=chunk))
    print("Time: {:.6f}".format(time.time() - start_tm))
    print("Number of primes: {}\n".format(np.sum(primes)))


def main():
    nbr_trails = 5
    for trail in range(nbr_trails):
        numbers = generate_data(trail*10)
        run_test_concurrent(numbers)
        run_test_sequential(numbers)
        run_test_multiprocessing(numbers)
        print("--\n")


if __name__ == '__main__':
    main()

当我在我的机器上运行它时 - Windows 7,四核英特尔酷睿 i5 我得到以下输出:

Concurrent test
Time: 2.006006
Number of primes: 431

Sequential test
Time: 0.010000
Number of primes: 431

Multiprocessing test
Time: 1.412003
Number of primes: 431
--

Concurrent test
Time: 1.302003
Number of primes: 447

Sequential test
Time: 0.010000
Number of primes: 447

Multiprocessing test
Time: 1.252003
Number of primes: 447
--

Concurrent test
Time: 1.280002
Number of primes: 446

Sequential test
Time: 0.010000
Number of primes: 446

Multiprocessing test
Time: 1.250002
Number of primes: 446
--

Concurrent test
Time: 1.260002
Number of primes: 446

Sequential test
Time: 0.010000
Number of primes: 446

Multiprocessing test
Time: 1.250002
Number of primes: 446
--

Concurrent test
Time: 1.282003
Number of primes: 473

Sequential test
Time: 0.010000
Number of primes: 473

Multiprocessing test
Time: 1.260002
Number of primes: 473
--

我的问题是,我是否可以通过在 WindowsPython 3.6.4 |Anaconda, Inc.| 上同时运行它来使其更快。我在 SO ( Why is creating a new process more expensive on Windows than Linux? ) 上读到在 Windows 上创建新进程的成本很高。有什么办法可以加快速度吗?我错过了一些明显的东西吗?

我也试过只创建一次Pool,但似乎没什么用。


编辑:

原始代码结构大致如下:

我的代码结构大致是这样的:

class Foo(object):

    def g() -> int:
        # function performing simple calculations
        # single function call is fast (~500 ms)
        pass


def run(self):
    nbr_processes = multiprocessing.cpu_count() - 1

    with multiprocessing.Pool(processes=nbr_processes) as pool:
        foos = get_initial_foos()

        solution_found = False
        while not solution_found:
            # one iteration
            chunk = len(foos)//nbr_processes
            vals = list(pool.map(Foo.g, foos, chunksize=chunk))

            foos = modify_foos()

foos1000 个元素。不可能提前知道算法收敛的速度有多快以及执行了多少次迭代,可能有数千次。

最佳答案

您的设置对多处理并不公平。您甚至包括了不必要的 primes = None 赋值。 ;)

几点:


数据大小

您生成的数据非常少,可以收回流程创建的开销。尝试使用 range(1_000_000) 而不是 range(5000)。在 Linux 上,multiprocessing.start_method 设置为“spawn”(Windows 上的默认设置),这会绘制不同的图片:

Concurrent test
Time: 0.957883
Number of primes: 89479

Sequential test
Time: 1.235785
Number of primes: 89479

Multiprocessing test
Time: 0.714775
Number of primes: 89479

重复使用您的池

不要离开池的 with-block,只要你在你的程序中留下了任何你想稍后并行化的代码。如果您在开始时只创建一次池,那么将池创建包含在您的基准测试中根本没有多大意义。


NumPy

Numpy 部分能够释放全局解释器锁 (GIL)。这意味着,您可以从多核并行中获益,而无需创建进程的开销。如果您无论如何都在做数学,请尝试尽可能多地使用 numpy。使用 numpy 的代码尝试 concurrent.futures.ThreadPoolExecutormultiprocessing.dummy.Pool

关于python - 使用 Python 在 Windows 上实现并发/并行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52246574/

相关文章:

caching - 像redis一样使用Elasticsearch作为键值缓存是否有意义

python - 用python解析dns配置文件

Python/Pygame 按钮按下声音

python - 我在 cmd 中运行 .py 脚本,出现错误 "ModuleNotFoundError"

windows - 在 ISAPI 过滤器中,对于多个进程的通用日志文件来说,什么是好方法?

windows - WCF 托管在 Windows 服务错误中

python - 我需要在 waitress 提供的 Flask api 中使用多重处理,如何实现?

python - 使用 Web2Py 创建通用 Controller 功能

python - 使用 python glob 忽略 Windows 隐藏文件

java - 如何充分利用多处理器?