我正在通过nestjs创建微服务,传输抛出rabbitmq。 如何让微服务依次从队列接收消息,等待上一条消息完成。
- main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'rmq_queue',
queueOptions: { durable: false },
prefetchCount: 1,
},
});
await app.listenAsync();
}
bootstrap();
- app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('hello')
async handleHello(): Promise<void> {
Logger.log('-handle-');
await (new Promise(resolve => setTimeout(resolve, 5000)));
Logger.log('---hello---');
}
}
- client.js
const { ClientRMQ } = require('@nestjs/microservices');
(async () => {
const client = new ClientRMQ({
urls: ['amqp://localhost:5672'],
queue: 'rmq_queue',
queueOptions: { durable: false },
});
await client.connect();
for (let i = 0; i < 3; i++) {
client.emit('hello', 0).subscribe();
}
})();
https://github.com/heySasha/nest-rmq
实际输出:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +9ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +12ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +4967ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +1ms
但我期望:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
最佳答案
您想要的通常是通过消费者的认可来实现的。您可以阅读有关它们的信息 here 。简而言之,预取计数设置为 1 的消费者(在您的例子中为 Nest.js 微服务)只有在确认前一条消息后才会收到新消息。如果您熟悉AWS SQS,此操作类似于从队列中删除消息。
Nest.js 在底层使用 amqplib 与 RabbitMQ 进行通信。 channel creation期间建立消费者确认政策- 您可以看到有一个 noAck
选项。但是,该 channel 是在将 noAck
设置为 true
的情况下创建的 - 您可以检查它 here ,这意味着监听器会在消息传递到您的 @EventHandler
方法时自动确认消息。您可以使用 RabbitMQ 管理插件来验证这一点,该插件提供方便的 UI 并能够检查传输中的未确认消息。
我未能在 Nest.js 源代码和文档中找到任何有用的信息。但这可能会给你一个提示。
关于node.js - 如何让等待 Action 完成,然后收到新消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56129216/