python - 使用异步服务器的长时间运行的任务

标签 python websocket tornado python-asyncio aiohttp

我想每个人都知道在django中如何处理长时间运行的任务:使用celery并放松。但是,如果我想通过 aiohttp(或 tornado)获得 websockets 的好处怎么办?

假设我有一个 CPU 密集型任务,可能需要几秒到数 (5-10) 分钟。在 websocket 循环中处理这个任务并通知用户进度看起来是个不错的主意。没有 ajax 请求,对短任务的响应非常快。

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:     
            answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
                long_running_task(msg.data, NotificationHelper(ws))
            ws.send_str(json.dumps({
                'action': 'got-answer',
                'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
            }))
    return ws

但另一方面,据我所知,以这种方式服务的 CPU 绑定(bind)任务会阻塞整个线程。如果我有 10 个工作人员和 11 个客户想要使用应用程序,则在第一个客户的任务完成之前不会为第 11 个客户提供服务。

也许,我应该在 celery 中运行看起来很大的任务,在主循环中运行看起来很小的任务?

那么,我的问题是:有没有什么好的设计模式可以使用异步服务器来处理长时间运行的任务?

谢谢!

最佳答案

只需通过 loop.run_in_executor() 运行您的长时间运行的 CPU 密集型任务,并通过 loop.call_soon_threadsafe() 发送进度通知。

如果您的工作不是 CPU 而是 IO 绑定(bind)(例如发送电子邮件),您可以通过 loop.create_task() 调用创建一个新任务。看起来像是在生成新线程。

如果您不能使用“即发即弃”方法,您需要使用像 RabbitMQ 这样的持久消息代理(有 https://github.com/benjamin-hodgson/asynqp 库以异步方式与 Rabbit 通信)。

关于python - 使用异步服务器的长时间运行的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35355849/

相关文章:

python - 为什么使用闭包将函数绑定(bind)到对象是错误的?

go - 打开数据通道的请求不包含 token

django - Tornado 还是 Node ? express 呢? Node Django?

python - 基于协程的状态机

python - 有没有办法防止 VSC 中的自动格式化程序更改某些行/代码段?

python - 导入 Apex 时出现 `UnencryptedCookieSessionFactoryConfig` 错误

python - HTTP 回调 URL 与 WebSocket 的异步响应?

php - 有没有办法在 Heroku 中使用 PHP WebSocket?,默认情况下似乎只支持其他技术

python - 如何在同一测试用例中使用假设和基于 pytest-tornado 产量的测试?

python - 具有线性回归模型的 fillna 从数据框 pandas 中的两列构建