我正在使用 NestJS 和 typescript 开发两个微服务,用于在两个不同的队列上消费和发布消息。场景如下
- 消费者微服务从 Kafka 主题或队列读取消息
- 然后它会将此消息发送到第二个微服务。
问题出现在第二步,消息从未由消费者微服务发送。
<小时/>消息消费是通过库kafka-node
中的Consumer
实现的。消息被正确使用并且可以被记录。
微服务之间的通信是通过包 @nestjs/microservices
中的 RedisClient
进行的,并且在从监听器方法外部执行时可以正常工作。
当微服务之间的消息从其他任何地方发送时,我可以在 redis-cli MONITOR
上看到它们,但是当从监听器函数发送时,什么也不会出现。
代码
Redis 客户端和 Kafka 消费者初始化:
private readonly kafkaClient: kafka.KafkaClient;
private readonly consumer: kafka.Consumer;
private readonly client: ClientRedis;
constructor() {
this.kafkaClient = new kafka.KafkaClient(KAFKA_HOST);
this.consumer = new kafka.Consumer(this.kafkaClient, TOPICS, OPTIONS);
this.client = new RedisClient(REDIS_URL);
}
消费消息的监听函数是:
async onApplicationBootstrap() {
this.consumer.on('message', message => {
// Sends the message to Redis
this.client.send<Message>(PATTERN, message);
});
}
然后,第二个微服务应该通过 @MessagePattern(PATTERN)
NestJS 的装饰器获取消息。但正如我之前所说,它甚至没有发布在 Redis 上。
最佳答案
好的,问题出在 ClientRedis::send()
方法返回的 Observable
上。只需将 Observable
转换为 Promise
就解决了问题:
this.client.send<Message>(PATTERN, message).toPromise()
关于javascript - 如何从 EventEmitter 监听器函数调用 NestJs 微服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56277220/