我有一个 Django
项目,使用 Celery
和 RabbitMQ
代理。现在我想从 NodeJS 服务器调用 django (celery) 任务。
在我的 NodeJS
中,我使用 amqplib
。这允许我将任务发送到 RabbitMQ
:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'celery';
ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer('What should I write here?'));
});
});
我的问题是 celery 使用什么格式?我应该向 Buffer 写入什么来调用 celery Worker?
例如,在我的 CELERY_ROUTES
(django 设置)中,我有 blabla.tasks.add
:
CELERY_ROUTES = {
...
'blabla.tasks.add': 'high-priority',
}
如何调用这个blabla.tasks.add
函数?
我尝试了很多方法,但 celery worker 给我错误:收到并删除了未知消息。目的地错误?!?
最佳答案
我找到了解决方案http://docs.celeryproject.org/en/latest/internals/protocol.html在这里。
消息格式示例为:
{
“id”:“4cc7438e-afd4-4f8f-a2f3-f46567e7ca77”,
"任务": "celery.task.PingTask",
“参数”:[],
“kwargs”:{},
“重试”:0,
“eta”:“2009-11-17T12:30:56.527191”
}
所以代码应该是:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'celery';
ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer('{"id": "this-is-soo-unique-id", "task": "blabla.tasks.add", "args": [1, 2], "kwargs": {}, "retries": 0}'), {
contentType: 'application/json',
contentEncoding: 'utf-8',
});
});
});
关于node.js - Celery(Django)+RabbitMQ+nodejs服务器交换数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39997697/