node.js - Celery(Django)+RabbitMQ+nodejs服务器交换数据

标签 node.js django rabbitmq celery

我有一个 Django 项目,使用 CeleryRabbitMQ 代理。现在我想从 NodeJS 服务器调用 django (celery) 任务。

在我的 NodeJS 中,我使用 amqplib。这允许我将任务发送到 RabbitMQ:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'celery';

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer('What should I write here?'));
  });
});

我的问题是 celery 使用什么格式?我应该向 Buffer 写入什么来调用 celery Worker?

例如,在我的 CELERY_ROUTES (django 设置)中,我有 blabla.tasks.add:

CELERY_ROUTES = {
   ...
   'blabla.tasks.add': 'high-priority',
}

如何调用这个blabla.tasks.add函数?

我尝试了很多方法,但 celery worker 给我错误:收到并删除了未知消息。目的地错误?!?

最佳答案

我找到了解决方案http://docs.celeryproject.org/en/latest/internals/protocol.html在这里。

消息格式示例为:

{ “id”:“4cc7438e-afd4-4f8f-a2f3-f46567e7ca77”, "任务": "celery.task.PingTask", “参数”:[], “kwargs”:{}, “重试”:0, “eta”:“2009-11-17T12:30:56.527191” }

所以代码应该是:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'celery';

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer('{"id": "this-is-soo-unique-id", "task": "blabla.tasks.add", "args": [1, 2], "kwargs": {}, "retries": 0}'), {
      contentType: 'application/json',
      contentEncoding: 'utf-8',
    });
  });
});

关于node.js - Celery(Django)+RabbitMQ+nodejs服务器交换数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39997697/

相关文章:

node.js - Docker (compose) 和套接字 io - 如何链接容器

node.js - 尝试通过 mongoose 使用 Node js 将对象保存在文档上时出现问题

django - request.META 不包含从curl -H 传递的 header

python - Django 自定义登录页面

java - 如何在 RABBITMQ 中为消费者端设置时间

javascript - 一种型号的两个版本

node.js - 找不到任何nodejs模块AWS Elastic Beanstalk

python - 如何在 Django 2 中实现 Django-Private-Chat

java - 如何强制Rabbit MQ重新累积并发送消息?

python - celery :删除超过 5 分钟的空队列?