python - 使用 asyncio 发出 100 个请求后,并行请求会无限阻塞

标签 python concurrency python-asyncio aiohttp httpx

我尝试过使用 httpx 和 aiohttp,两者都有这个硬编码限制。

import asyncio

import aiohttp
import httpx


async def main():
    client = aiohttp.ClientSession() 
    # client = httpx.AsyncClient(timeout=None)

    coros = [
        client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
        )
        for _ in range(500)
    ]

    for i, coro in enumerate(asyncio.as_completed(coros)):
        await coro
        print(i, end=", ")


asyncio.run(main())

输出 -

0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22、23 , 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48 , 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73 , 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98 , 99

两个库的值都停留在 99

如果每个请求都使用新的 session ,则不会发生这种情况。

我做错了什么? asyncio 的全部意义不就是让事情变得如此简单吗?


我尝试用线程、zmq 和请求重写它,效果很好 -

import zmq

N_WORKERS = 100
N_ITERS = 500

ctx = zmq.Context.instance()


def worker():
    client = requests.Session()

    pull = ctx.socket(zmq.PULL)
    pull.connect("inproc://#1")

    push = ctx.socket(zmq.PUSH)
    push.connect("inproc://#2")

    while True:
        if not pull.recv_pyobj():
            return

        r = client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
        )
        push.send_pyobj(r.content)


def ventilator():
    push = ctx.socket(zmq.PUSH)
    push.bind("inproc://#1")

    # distribute tasks to all workers
    for _ in range(N_ITERS):
        push.send_pyobj(True)

    # close down workers
    for _ in range(N_WORKERS):
        push.send_pyobj(False)



# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
    t.start()

# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")

for i in range(N_ITERS):
    pull.recv_pyobj()
    print(i, end=", ")

# wait for workers to exit
for t in threads:
    t.join()

最佳答案

问题是您 client.get(...) 返回一个带有操作系统级套接字实时句柄的请求对象。未能关闭该对象会导致 aiohttp 耗尽套接字,即达到连接器限制,即 100 by default .

要解决此问题,您需要关闭 client.get() 返回的对象,或使用 async with 这将确保对象在以下情况下立即关闭: with block 已完成。例如:

async def get(client):
    async with client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
        pass

async def main():
    async with aiohttp.ClientSession() as client:
        coros = [get(client) for _ in range(500)]
        for i, coro in enumerate(asyncio.as_completed(coros)):
            await coro
            print(i, end=", ", flush=True)

asyncio.run(main())

此外,还应该关闭 aiohttp.ClientSession 对象,这也可以使用 async with 来完成,如上所示。

关于python - 使用 asyncio 发出 100 个请求后,并行请求会无限阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63447722/

相关文章:

python - 使用python在多个不同的文件夹中创建类似的多个子文件夹

java - concurrntHashMap 能否同时保证真正的线程安全和并发?

java - 处理来自客户端的多个请求以更新 Spring 应用程序中表中的列

python - 使用 asyncio.gather 时正确捕获 aiohttp TimeoutError

Python 异步调试示例

python - 如何不错过 itertools.takewhile() 之后的下一个元素

python - 如何根据条件复制数据框行

python - 将文本从网站复制到文本/excel 文件

MySQL 隔离级别性能

python - 使用特定信号终止通过 asyncio 运行的外部程序