node.js - 如何让等待 Action 完成,然后收到新消息?

标签 node.js rabbitmq microservices nestjs

我正在通过nestjs创建微服务,传输抛出rabbitmq。 如何让微服务依次从队列接收消息,等待上一条消息完成。

  • main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();

  • app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');
  }
}
  • client.js
const { ClientRMQ } = require('@nestjs/microservices');

(async () => {
  const client = new ClientRMQ({
    urls: ['amqp://localhost:5672'],
    queue: 'rmq_queue',
    queueOptions: { durable: false },
  });

  await client.connect();

  for (let i = 0; i < 3; i++) {
    client.emit('hello', 0).subscribe();
  }
})();

https://github.com/heySasha/nest-rmq

实际输出:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +9ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +12ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +4967ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +1ms

但我期望:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms

最佳答案

您想要的通常是通过消费者的认可来实现的。您可以阅读有关它们的信息 here 。简而言之,预取计数设置为 1 的消费者(在您的例子中为 Nest.js 微服务)只有在确认前一条消息后才会收到新消息。如果您熟悉AWS SQS,此操作类似于从队列中删除消息。

Nest.js 在底层使用 amqplib 与 RabbitMQ 进行通信。 channel creation期间建立消费者确认政策- 您可以看到有一个 noAck 选项。但是,该 channel 是在将 noAck 设置为 true 的情况下创建的 - 您可以检查它 here ,这意味着监听器会在消息传递到您的 @EventHandler 方法时自动确认消息。您可以使用 RabbitMQ 管理插件来验证这一点,该插件提供方便的 UI 并能够检查传输中的未确认消息。

我未能在 Nest.js 源代码和文档中找到任何有用的信息。但这可能会给你一个提示。

关于node.js - 如何让等待 Action 完成,然后收到新消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56129216/

相关文章:

node.js - Nodejs + Hapi + SSL/TLS + OATH2 + JWTs 作为承载

rabbitmq - spring-cloud 环境中的分布式 Rabbitmq

docker - 使用Docker Compose或Kubernetes时动态服务发现如何工作?

java - 将 JPA 嵌入式实体类 id 映射到可嵌入实体类 id

node.js - 将 ORM 中模型的输出更改为我自己的 ORM 中的数据格式 - sequilize Node

node.js - 如何在 worker 之间共享动态对象?

linux - 如何在 Amazon Linux 上安装 rabbitmq?

python - pika.exceptions.ProbableAuthenticationError 尝试将消息发送到远程队列时出错

oracle - Weblogic 上的 Netflix OSS/Spring Cloud

javascript - 联系表单发送按钮抛出错误