python-3.x - 如何为 Python Asyncio 创建自定义事件

标签 python-3.x python-asyncio mesos marathon

Mesos 调度程序 Marathon有一个异步 HTTP API。例如。当通过将 JSON 发布到 /v2/apps 来部署应用程序时,会返回部署 ID。然后,该 ID 可用于轮询 /v2/deployments 中的部署状态,或订阅 /v2/events 并查找 deployment_success > 事件。

我想创建一个带有协程的异步 Python 客户端。例如。一旦 deployment_success 事件到达,client.deploy_app(...) 应该返回,但不会阻塞。

如何使用 asyncio 实现这些方法?如何创建事件监听器?感觉事件循环是为此创建的,但我不知道如何注册事件。

最佳答案

可以使用aiohttp模块创建/v2/apps所需的异步post http请求:

import asyncio
import aiohttp


async def post(url, json):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=json) as resp:
            return await resp.json()


async def main():
    res = await post('http://httpbin.org/post', {'test': 'object'})
    print(res['json'])  # {'test': 'object'}


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果您想使用 /v2/events 来跟踪部署成功,您应该请求流(请参阅 api doc )。它可以在 aiohttp 中通过它的 asynchronous iteration 来实现。 :您只需逐行异步读取内容,等待您需要的事件,例如:

import asyncio
import aiohttp


async def stream(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            async for line in resp.content:
                yield line


async def main():
    async for line in stream('http://httpbin.org/stream/10'):
        print(line)  # check if line contains event you need


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果您想使用/v2/deployments,您应该使用asyncio.sleep定期请求它等待一些延迟。在这种情况下,您的函数不会被阻塞:

import asyncio
import aiohttp


async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()


async def main():
    while True:       
        # Make request to see if deplayment finished:
        res = await get('http://httpbin.org/get')
        print(res['origin'])  # break if ok here
        # Async wait before next try:
        await asyncio.sleep(3)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

关于python-3.x - 如何为 Python Asyncio 创建自定义事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47835371/

相关文章:

python - 如何将某物称为对象而不是列表?

python - 使用 XLSX Writer 将数据透视表写入 Excel 工作簿范围

Python:依次等待协程列表

python - sqs 的异步等待接收消息无法正常工作

python - 在 IOLoop.current().run_in_executor 中运行函数?

Mesos 集群上的 Kubernetes

docker - Docker 集群能在多大程度上降低基础设施成本?

python - 将用户指定的数字添加到文件中已有的数字。 (Python)

python - 列表理解同时迭代两个变量

docker - 使用 mesos、marathon 和 zookeeper 时,当我用 "containerizers"指定 "docker,mesos"文件时,我的 mesos-slave 没有启动?