python - 逐步创建异步任务并等待所有任务完成

标签 python python-3.x python-asyncio aiohttp

我正在尝试制作一个程序来与我创建的服务器建立大量网络套接字连接:

class WebSocketClient():

    @asyncio.coroutine
    def run(self):
        print(self.client_id, 'Connecting')
        ws = yield from aiohttp.ws_connect(self.url)
        print(self.client_id, 'Connected')
        print(self.client_id, 'Sending the message')
        ws.send_str(self.make_new_message())

        while not ws.closed:
            msg = yield from ws.receive()

            if msg.tp == aiohttp.MsgType.text:
                print(self.client_id, 'Received the echo')
                yield from ws.close()
                break

        print(self.client_id, 'Closed')


@asyncio.coroutine
def make_clients():

    for client_id in range(args.clients):
        yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run()


event_loop.run_until_complete(make_clients())

问题是所有的客户一个接一个地做他们的工作:

0 Connecting
0 Connected
0 Sending the message
0 Received the echo
0 Closed
1 Connecting
1 Connected
1 Sending the message
1 Received the echo
1 Closed
...

我试过使用 asyncio.wait ,但是所有的客户都是一起开始的。我希望它们逐渐创建并在创建它们后立即连接到服务器。同时继续创造新客户。

我应该采用什么方法来实现这一点?

最佳答案

使用 asyncio.wait是一个好方法。您可以将它与 asyncio.ensure_future 结合使用和 asyncio.sleep逐步创建任务:

@asyncio.coroutine
def make_clients(nb_clients, delay):
    futures = []
    for client_id in range(nb_clients):
        url = WS_CHANNEL_URL.format(client_id=client_id)
        coro = WebSocketClient(client_id, url).run()
        futures.append(asyncio.ensure_future(coro))
        yield from asyncio.sleep(delay)
    yield from asyncio.wait(futures)

编辑:我实现了一个 FutureSet 类,它应该做你想做的事。该集合可以填充 future 并在完成后自动将其删除。也可以等待所有 future 完成。

class FutureSet:

    def __init__(self, maxsize, *, loop=None):
        self._set = set()
        self._loop = loop
        self._maxsize = maxsize
        self._waiters = []

    @asyncio.coroutine
    def add(self, item):
        if not asyncio.iscoroutine(item) and \
           not isinstance(item, asyncio.Future):
            raise ValueError('Expecting a coroutine or a Future')
        if item in self._set:
            return
        while len(self._set) >= self._maxsize:
            waiter = asyncio.Future(loop=self._loop)
            self._waiters.append(waiter)
            yield from waiter
        item = asyncio.async(item, loop=self._loop)    
        self._set.add(item)
        item.add_done_callback(self._remove)

    def _remove(self, item):
        if not item.done():
            raise ValueError('Cannot remove a pending Future')
        self._set.remove(item)
        if self._waiters:
            waiter = self._waiters.pop(0)
            waiter.set_result(None)

    @asyncio.coroutine
    def wait(self):
        return asyncio.wait(self._set)

例子:

@asyncio.coroutine
def make_clients(nb_clients, limit=0):
    futures = FutureSet(maxsize=limit)
    for client_id in range(nb_clients):
        url = WS_CHANNEL_URL.format(client_id=client_id)
        client = WebSocketClient(client_id, url)
        yield from futures.add(client.run())
    yield from futures.wait()

关于python - 逐步创建异步任务并等待所有任务完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33172209/

相关文章:

python - PyQt 字体大小随窗口显示配置而变化

python等待shell命令完成

python - 必须在 def __init__ 中声明所有 Python 实例变量吗?

django - RuntimeError : Model class xxx doesn't declare an explicit app_label and isn't in an application in INSTALLED_APPS

python 3 : TypeError: Type str doesn't support the buffer API

websocket - Asyncio : Fastapi with aio-pika, 消费者忽略等待

python - 如何在Python中获取日志文件中的运行命令

python - 为什么使用 Asyncio 没有减少 Python 的整体执行时间和并发运行函数?

python - asyncio - 代码同步执行

python - 具有无限数量参数的 Django urlpattern