node.js - 如何使用 AMQP 以 "PeekLock"模式从 Azure 服务总线队列获取消息?

标签 node.js azure amqp azure-servicebus-queues

我们正在尝试在 Node 应用程序中使用 Azure 服务总线。 我们的要求是从队列中获取多条消息

由于 Azure SDK for Node 不支持批量检索,因此我们决定使用 AMQP。虽然我们能够使用 Peek Messages 获取消息,如此处所述 ( https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations )。

我们注意到,一旦获取消息,它们就会从队列中删除。我想知道是否有人了解如何使用 AMQP 和 Node.js 在“PeekLock”模式下获取消息。对于 AMQP,我们使用 amqp10 Node 包 ( https://www.npmjs.com/package/amqp10 )。

这是我们用于查看消息的代码:

const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
    return Promise.all([
        client.createReceiver(queueName),
        client.createSender(queueName)
    ]);
})
.spread(function(receiver, sender) {
    console.log(receiver);
    console.log(sender);
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', function(err) {
        // check for errors
        console.log(err);
    });
    receiver.on('message', function(message) {
        console.log('Received message');
        console.log(message);
        console.log('------------------------------------');
    });

    return sender.send([], {
        operation: 'com.microsoft:peek-message',
        'message-count': 5
    });
})
.error(function (e) {
    console.warn('connection error: ', e);
});

最佳答案

默认情况下,接收器工作在自动解决模式,您必须将其更改为解决处置:

const { Constants } = require('amqp10')

// 
// ...create client, connect, etc...
//

// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
  attach: {
    rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
  }
})

处理消息后不要忘记接受/拒绝/释放消息:

receiver.on('message', msg => {
  //
  // ...do something smart with a message...
  //

  receiver.accept(msg) // <- manually settle a message
})

关于node.js - 如何使用 AMQP 以 "PeekLock"模式从 Azure 服务总线队列获取消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43361331/

相关文章:

php - 将此 cuRL 转换为 CFHTTP - Azure NotificationHub

python - 如何使用 python 列出或发现 RabbitMQ 交换中的队列?

nservicebus - *nix 生态系统中存在哪些消息总线技术?

node.js - 调试 Composer 事务处理器?

node.js - 找不到模块 : Error: Cannot resolve 'file' or 'directory'

带有嵌入式文档的 MongoDB 数据库设计

javascript - Azure PutBlock 最大块数

angular - 在 Angular 中离线播放 Azure 媒体服务中的视频

memory - 为什么rabbitmq上的二进制内存使用量会增加

node.js - 使用 Nodejs 的服务帐户在 Google 日历中插入事件