node.js - Node.js 中使用 rhea 的 AMQP 1.0 临时队列

标签 node.js rabbitmq activemq rpc amqp

我使用 RabbitMQ 一段时间了。我有几个使用基本 RPC 机制运行的微服务,该机制非常接近 Rabbit tutorials 中描述的机制。 。我尝试使用 rhea 切换到 AMQP 1.0,因为我需要使用 Amazon MQ。然而,我仍然坚持复制这个简单的模式:

ch.assertQueue('', {exclusive: true}, function(err, q) {
 let corr = //some UUID
   ch.consume(q.queue, function(msg) {
     /* */
   });

ch.sendToQueue('rpc_queue',
      "TEST2",
      { correlationId: corr, replyTo: q.queue });
    });
})

我没有从 rhea 那里得到的是可以有临时队列(与客户端连接相关)和然后将“replyTo”发送到这些队列。

我尝试过:

client.open_receiver({
    source: { address: "rpc:callback", expiry_policy: "connection-close" }
  });

使用expiry_policy,但它不起作用。我什至尝试使用带有 AMQP 1.0 插件的 RabbitMQ,然后使用 Apache ActiveMQ。

重点是,我想...

  1. 拥有一个临时(独占)队列,当客户端连接断开时该队列会自动断开。
  2. 使用该临时队列(我可以手动为其分配临时名称,这不是重点)来处理回复。

但是,我既无法获取临时队列(AMQP 0.9.1 独有),也无法使用该名称来处理回复。

最佳答案

const container = require("rhea");
const _logger = require("pino")();
const nanoid = require("nanoid");

const init = ({ config, caller, resources, services, rpcs }) => {
  return new Promise((resolve, reject) => {
    let _rpcs = {};
    let _responses = {};

    const send = (sender, receiver, correlation_id, body) => {
      if (receiver.source.address) {
        sender.send({
          reply_to: receiver.source.address,
          correlation_id,
          body
        });
      }
    };

    container.on("connection_open", context => {
      //RPCS
      rpcs &&
        rpcs.forEach(sendTo => {
          let parts = sendTo.name.split(".");
          _rpcs[parts[0]] = _rpcs[parts[0]] ? _rpcs[parts[0]] : {};

          let sender = context.connection.open_sender(sendTo.name);
          let receiver = context.connection.open_receiver({
            source: { dynamic: true }
          });

          receiver.on("message", context => {
            let correlation_id = context.message.correlation_id;
            if (_responses[correlation_id]) {
              let { resolve, reject } = _responses[correlation_id];
              resolve(context.message.body);
              delete _responses[correlation_id];
            }
          });

          _rpcs[parts[0]][parts[1]] = body =>
            new Promise((resolve, reject) => {
              const correlation_id = nanoid();
              _responses[correlation_id] = { resolve, reject };
              send(sender, receiver, correlation_id, body);
            });
        });

      // SERVICES
      services &&
        services.forEach(service => {
          let receiver = context.connection.open_receiver({
            source: `${resources.name}.${service.name}`,
            //credit_window: 1, //service.prefetch || 500,
            autoaccept: false
          });

          receiver.on("message", async context => {
            let request = context.message;
            let reply_to = request.reply_to;
            let payload = request.body;

            try {
              let response = {
                to: reply_to,
                body: await caller(service.responder)({ payload })
              };
              if (request.correlation_id) {
                response.correlation_id = request.correlation_id;
              }
              context.connection.send(response);
              context.delivery.accept();
            } catch (error) {
              _logger.error(error);
              context.delivery.reject();
            }
          });
        });
    });

    container.on("receiver_open", context => {
      resolve(_rpcs);
    });

    container.on("connection_error", error => _logger.error(error));

    container.connect(config.getResource("amqp"));
  });
};

module.exports = { init };

关于node.js - Node.js 中使用 rhea 的 AMQP 1.0 临时队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52875883/

相关文章:

php - 在 PHP 中解释 JavaScript

python - Django - 在 apache 每个请求创建的所有进程之间共享 RabbitMQ 连接?

java - ActiveMQ 故障转移 : How to detect when an application starts and cannot find a broker

java - 从死信队列恢复

apache - 监控 ActiveMQ 性能的工具

javascript - 将 Node.js 应用程序打包为 OS X 应用程序

javascript - 如何在 TypeScript 中将箭头函数返回值分配给 string[]

java - OffsetDateTime 中的 Z 偏移是什么?

python - Json 字符串解析在从 MSDOS 运行时有效,但在 Windows 上的 Ubuntu 上的 Bash 中无效

javascript - Jade Minified Javascript 未解析为文本 block