python - 哪个是在 Python 中并行运行多个任务的最佳方式

标签 python multithreading python-asyncio python-multiprocessing python-multithreading

我有一个函数:

import time

def all_40k():
    for _ in range(400000):
        print('validate')
        print('parsing')
        print('inserting')
if __name__ == '__main__':
    start_time = time.time()
    all_40k()
    print(f'used time:{time.time()-start_time}')

输出是:

used time:9.545064210891724

因为这个相同的函数重复了 40k 次,所以我希望同时运行 4 个并行函数,每个函数运行 10k,理想情况下这会快 4 倍。

所以我首先尝试了多线程:

import threading
import time
def first_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


def second_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def third_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def forth_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
    start_time = time.time()
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    print(f'used time:{time.time()-start_time}')

令我惊讶的是,输出是:

used time:23.058093309402466

然后我尝试了 asyncio:

import time
import asyncio

async def test_1():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_2():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_3():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_4():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def multiple_tasks():
  input_coroutines = [test_1(), test_2(), test_3(),test_4()]
  res = await asyncio.gather(*input_coroutines, return_exceptions=True)
  return res

if __name__ == '__main__':
  start_time = time.time()
  res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
  print(f'used time:{time.time()-start_time}')

输出是:

used time:9.295843601226807

最后我尝试了 ProcessPoolExecutor:

import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
    for i in range(urls[0], urls[1]):
        print('validate')
        print('parsing')
        print('inserting')

def run():
    urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
    with ProcessPoolExecutor() as excute:
        excute.map(data_handler,urls)

if __name__ == '__main__':
    start_time = time.time()
    run()
    stop_time = time.time()
    print('used time %s' % (stop_time - start_time))

输出是:

used time 12.726619243621826

那么我怎样才能加快这个过程呢?我想我走错了路。有 friend 可以帮忙吗?最好的问候!

最佳答案

好的,所以你注意到了什么:

No parallelism   9.545064210891724
asyncio          9.295843601226807
multithreading   12.726619243621826
Thread Pool      23.058093309402466

首先,Asyncio 实际上并不使用线程,如果您能猜到的话,性能依赖于一些 I/O。 Asyncio 在循环中的任务之间交替,每当有人点击 await 时切换。如果不使用 await,它最终只会一次运行一个任务,根本不会切换。

对于线程,由于 Global Interpreter Lock,只有一个线程能够控制 Python 解释器。 .你在这里最终得到的是来自不同线程的一堆争用,所有线程都试图同时工作。这种上下文切换会减慢您的应用程序。与 asyncio 类似,如果您想在等待某些 I/O 的同时安排其他工作,您实际上只会获得这些加速。

好吧,现在肯定多处理案例应该运行得更快..对吧?好吧,每个进程都有自己的解释器锁,但是,阻碍在您的 print 语句中。每个进程都被阻止试图将它们的输出发送到同一个控制台管道。让我用一个例子来告诉你。

假设我们有一个方法要运行 4 次。一次串行一次并行

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(500000):
        print('foobar')
    print(f"Finished thread: {thread}")


def run_singlethreaded():
    start_time = time.time()

    for thread in ["local"] * 4:
        run(thread)

    stop_time = time.time()
    return stop_time - start_time


def run_multiprocessing():
    start_time = time.time()

    with ProcessPoolExecutor(max_workers=4) as ex:
        ex.map(run, ["mp0", "mp1", "mp2", "mp3"])

    stop_time = time.time()
    return stop_time - start_time

if __name__ == '__main__':
    singlethreaded_time = run_singlethreaded()
    multiprocessing_time = run_multiprocessing()
    print(f"Finished singlethreaded in:  {singlethreaded_time}")
    print(f"Finished multiprocessing in: {multiprocessing_time}")

如果我们运行它并打印时间,您会惊讶地看到:

Finished singlethreaded in:  10.513998746871948
Finished multiprocessing in: 12.252000570297241

现在,如果我们将打印更改为更简单的不会导致 IO 瓶颈的内容:

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(100000000):
        pass
    print(f"Finished thread: {thread}")

您将获得预期的并行速度:

Finished singlethreaded in:  9.816999435424805
Finished multiprocessing in: 2.503000020980835

这里重要的一点是,在并行性可以帮助您之前,您需要了解您在哪里受到资源限制。在 IO 绑定(bind)应用程序的情况下,线程或 asyncio 可能会有所帮助。对于 CPU 密集型应用程序,多处理可能很有用。在其他情况下,两者都不会真正帮助您(例如 print 语句),因为瓶颈存在于应用程序外部的系统中。希望这对您有所帮助!

关于python - 哪个是在 Python 中并行运行多个任务的最佳方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67411039/

相关文章:

python - 查找第一个范围包含数字(Numpy、Pandas)

java - 如何优雅地停止正在运行阻塞操作的线程?

c++ - 在 C++ 中使用多线程绘制多边形

python-3.x - 使用 aiohttp.post 我如何将一些数据传递给它以进行迭代

Python asyncio 不显示任何错误

python - 如何从子窗口获取 GUI 主窗口中的数据?

python - 使用 Python 进行图像识别

python - 如何处理paramiko中的[Errno -2]?

iphone - 在 MFMailComposeViewController 中附加多张照片的内存泄漏

task - 是否可以在异步 python 中挂起和重新启动任务?