python - 如何使异步池可取消?

标签 python pool python-asyncio

我有一个 pool_map 函数,可用于限制同时执行函数的数量。

这个想法是有一个 coroutine function接受映射到可能参数列表的单个参数,但也将所有函数调用包装到信号量获取中,因此一次只有有限数量的运行:

from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore

A = TypeVar('A')
V = TypeVar('V')

async def pool_map(
    func: Callable[[A], Awaitable[V]],
    arg_it: Iterable[A],
    size: int=10
) -> Generator[Awaitable[V], None, None]:
    """
    Maps an async function to iterables
    ensuring that only some are executed at once.
    """
    semaphore = Semaphore(size)

    async def sub(arg):
        async with semaphore:
            return await func(arg)

    return map(sub, arg_it)

为了举例,我修改了上面的代码,但没有测试上面的代码,但我的变体运行良好。例如。你可以像这样使用它:

from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing

URLS = [...]

async def run_all(awaitables):
    for a in as_completed(awaitables):
        result = await a
        print('got result', result)

async def download(url): ...


if __name__ != '__main__':
    pool = pool_map(download, URLS)

    with closing(get_event_loop()) as loop:
        loop.run_until_complete(run_all(pool))

但是如果在等待 future 时抛出异常,就会出现问题。我不知道如何取消所有已计划或仍在运行的任务,以及仍在等待获取信号量的任务。

是否有我不知道的库或优雅的构建 block ,或者我必须自己构建所有部件? (即可以访问其等待者的 Semaphore,可以访问其正在运行的任务队列的 as_finished,...)

最佳答案

使用ensure_future来获取Task而不是协程:

import asyncio
from contextlib import closing


def pool_map(func, args, size=10):
    """
    Maps an async function to iterables
    ensuring that only some are executed at once.
    """
    semaphore = asyncio.Semaphore(size)

    async def sub(arg):
        async with semaphore:
            return await func(arg)

    tasks = [asyncio.ensure_future(sub(x)) for x in args]

    return tasks


async def f(n):
    print(">>> start", n)

    if n == 7:
        raise Exception("boom!")

    await asyncio.sleep(n / 10)

    print("<<< end", n)
    return n


async def run_all(tasks):
    exc = None
    for a in asyncio.as_completed(tasks):
        try:
            result = await a
            print('=== result', result)
        except asyncio.CancelledError as e:
            print("!!! cancel", e)
        except Exception as e:
            print("Exception in task, cancelling!")
            for t in tasks:
                t.cancel()
            exc = e
    if exc:
        raise exc


pool = pool_map(f, range(1, 20), 3)

with closing(asyncio.get_event_loop()) as loop:
    loop.run_until_complete(run_all(pool))

关于python - 如何使异步池可取消?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41677434/

相关文章:

python - 如果用户点击后退按钮,如何在 django View 中跳过插页式广告?

python - 带有嵌套网络请求的 Gevent 池

c# - 数据库重新联机后连接请求超时

javascript - 在其他 Node javascript 文件中重用 postgresql 池

python-3.x - 在 Python 中追加到合并的异步生成器

python - 在 grpc python 中处理异步流请求

python - TLS 连接超时(以及其他一些困难)

python - 你应该总是使用正则化 tensorflow 吗?

具有队列 : asyncio. 的 Python 3.8 websocket 回显客户端 Queue get() 不会即时添加队列项

python - 何时在 PySide 中设置父级