python - 通过使用多进程和多线程的 asyncio 提高 "Sending 100,000 request"的速度

标签 python multithreading python-asyncio aiohttp

首先,我想尽可能快地使用 1 个连接发送多个请求。下面的代码运行良好且快速,但我希望它超越异步。回到我的问题,是否可以使用多线程或多处理并行运行它。我听说您可以使用 ThreadPoolExecutor 或 ProcessPoolExecutor。

import random
import asyncio
from aiohttp import ClientSession
import time
from concurrent.futures import ProcessPoolExecutor

async def fetch(sem,url, session):
    async with sem:
        async with session.get(url) as response:
            return await response.read()
async def run(r):
    url = "http://www.example.com/"
    tasks = []
    sem = asyncio.Semaphore(1000)
    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.ensure_future(fetch(sem, url.format(i), session)) #return a task
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses
if __name__ == "__main__":
    number = 10000
    loop = asyncio.get_event_loop()
    start = time.time()
    loop.run_until_complete(run(number))
    end = time.time() - start
    print (end)

根据测试,它成功地在 49 秒内发送了大约 10k 个请求。 我需要它更快,有什么建议吗? (线程,进程)

最佳答案

ProcessPoolExecutor 是一种进行真正多处理的方法。 对于您的用例,基本上就像您同时启动程序的多个副本一样。如果您的机器具有所需的带宽和 CPU,您应该能够通过使用 ProcessPoolExecutor(max_workers=4) 将性能提高 4

然而,您将需要在每个子进程中有一个异步事件循环,因此您可以这样做:

def main(n):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(n))


with concurrent.futures.ProcessPoolExecutor(max_workers=4) as exc:
    exc.submit(main, 2500)
    exc.submit(main, 2500)
    exc.submit(main, 2500)
    exc.submit(main, 2500)

作为您的 run 的旁注功能:你也不需要使用ensure_future或任务,async def 的结果function 是一个协程,你直接等待或传递给 asyncio.gather

async def run(r):
    url = "http://www.example.com/"
    sem = asyncio.Semaphore(1000)
    async with ClientSession() as session:
        coros = [fetch(sem, url.format(i), session) for i in range(r)]
        await asyncio.gather(*coros)

关于python - 通过使用多进程和多线程的 asyncio 提高 "Sending 100,000 request"的速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40385691/

相关文章:

java - 从 UncaughtExceptionHandler 中重新执行任务?

python - asyncio start_server 超时问题

javascript - 如何将变量从 javascript 表单传递给 python(在同一台计算机上)?

python - 可以从代码中将 IPDB/Celery-RDB 堆栈跟踪 ('where' )打印到标准输出吗?

python - 填充和 reshape pandas 数据框

java - 我真正需要多少个线程?

python - 如何将带有斯堪的纳维亚字符的 UTF 字符串转换为 ASCII?

c# - 我应该如何处理停止的、死亡的线程?活着 == 假

python - 异步 : warn about long-running handlers

python - 有没有办法直接在Jupyter cell中调用await?