python - 为 Tornado 调整 celery.task.http.URL

标签 python asynchronous celery tornado nonblocking

Celery 包含一个模块,该模块能够使用 amqp 或其他一些 celery 后端发出异步 HTTP 请求。我正在使用 tornado-celery异步消息发布的生产者。据我了解tornado-celery为此使用鼠兔。问题是如何为 Tornado 调整 celery.task.http.URL(使其成为非阻塞)。基本上有两个地方需要细化:

  1. HttpDispatch.make_request() 必须使用 tornado 异步 http 客户端实现;
  2. URL.get_async(**kw)URL.post_async(**kw) 必须使用 tornado API 以相应的非阻塞代码重新实现。例如:

    class NonBlockingURL(celery.task.http.URL):
    
        @gen.coroutine
        def post_async(self, **kwargs):
            async_res = yield gen.Task(self.dispatcher.delay, 
                                       str(self), 'POST', **kwargs)
            raise gen.Return(async_res)
    

但我不明白如何以正确和简洁的方式做到这一点。如何让它完全像异步一样非阻塞?顺便说一句,我正在使用 amqp 后端。

请给我一个很好的指导方针,或者更好的例子。

最佳答案

其实你要决定是用Tornado的async方法还是用cellery这样的queue。没有必要同时使用两者,因为队列会快速响应队列的状态,因此 Tornado 在等待队列响应时做其他事情是没有意义的。要在两种解决方案之间做出决定,我会说:

Celery:更加模块化,易于分发到不同的核心或不同的机器,任务可以被除 tornado 之外的其他人使用,你必须安装并保持运行软件(amqp,cellery workers ...)

Tornado 中的异步:更单一,一个程序做所有事情,更短的代码,一个程序运行

要使用Tornado的async方法,请引用文档。 这是一起使用 celery 和 Tornado 的简短解决方案:

任务.py

 from celery import Celery,current_task
 import time
 celery=Celery('tasks',backend='amqp',result_backend='amqp')

 @celery.task
 def MyTask(url,resid):
     for i in range(10):
         time.sleep(1)
         current_task.update_state(state='running',meta={'i': i})
     return 'done'

服务器.py

 import tasks
 from tornado import RequestHandler,....
 from tornado.web import Application
 dictasks={}
 class runtask(RequestHandler):
     def post(self):
         i=len(dictasks)
         dictasks[i]=task.MyTask.delay()
             self.write(i)

 class chktask(RequestHandler):
     def get(self,i):
         i=int(i)
         if dictasks[i].ready():
             self.write(dictasks[i].result)
             del dictasks[i]
         else:
             self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))


 Application = Application([
     (r"/runtask", runtask}),
     (r"/chktask/([0-9]+)", chktask),

 etc.

关于python - 为 Tornado 调整 celery.task.http.URL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18711215/

相关文章:

python - django celery 和弦任务错误

Python 列表帮助

Scala 异步/等待和并行化

c++ - 为什么GRPC AsyncClient在等待完成队列中的下一个结果时抛出Segfault

python - 从celery后端(redis)获取数据

python - 对多进程 API 调用者应用速率限制/throttle 的最佳方法 - 使用 Celery

python - Django 模型和遗留类集成

python - 将函数应用于已有的日期列

python - 如何在现有的旧 venv 中将 Django 与 Python 3 结合使用?

c# - 如何在 C# 中捕获等待的异步方法的异常?