Celery 包含一个模块,该模块能够使用 amqp 或其他一些 celery 后端发出异步 HTTP 请求。我正在使用 tornado-celery异步消息发布的生产者。据我了解tornado-celery为此使用鼠兔。问题是如何为 Tornado 调整 celery.task.http.URL(使其成为非阻塞)。基本上有两个地方需要细化:
HttpDispatch.make_request()
必须使用 tornado 异步 http 客户端实现;URL.get_async(**kw)
或URL.post_async(**kw)
必须使用 tornado API 以相应的非阻塞代码重新实现。例如:class NonBlockingURL(celery.task.http.URL): @gen.coroutine def post_async(self, **kwargs): async_res = yield gen.Task(self.dispatcher.delay, str(self), 'POST', **kwargs) raise gen.Return(async_res)
但我不明白如何以正确和简洁的方式做到这一点。如何让它完全像异步一样非阻塞?顺便说一句,我正在使用 amqp 后端。
请给我一个很好的指导方针,或者更好的例子。
最佳答案
其实你要决定是用Tornado的async方法还是用cellery这样的queue。没有必要同时使用两者,因为队列会快速响应队列的状态,因此 Tornado 在等待队列响应时做其他事情是没有意义的。要在两种解决方案之间做出决定,我会说:
Celery:更加模块化,易于分发到不同的核心或不同的机器,任务可以被除 tornado 之外的其他人使用,你必须安装并保持运行软件(amqp,cellery workers ...)
Tornado 中的异步:更单一,一个程序做所有事情,更短的代码,一个程序运行
要使用Tornado的async方法,请引用文档。 这是一起使用 celery 和 Tornado 的简短解决方案:
任务.py
from celery import Celery,current_task
import time
celery=Celery('tasks',backend='amqp',result_backend='amqp')
@celery.task
def MyTask(url,resid):
for i in range(10):
time.sleep(1)
current_task.update_state(state='running',meta={'i': i})
return 'done'
服务器.py
import tasks
from tornado import RequestHandler,....
from tornado.web import Application
dictasks={}
class runtask(RequestHandler):
def post(self):
i=len(dictasks)
dictasks[i]=task.MyTask.delay()
self.write(i)
class chktask(RequestHandler):
def get(self,i):
i=int(i)
if dictasks[i].ready():
self.write(dictasks[i].result)
del dictasks[i]
else:
self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))
Application = Application([
(r"/runtask", runtask}),
(r"/chktask/([0-9]+)", chktask),
etc.
关于python - 为 Tornado 调整 celery.task.http.URL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18711215/