我有一个带有 Kafka 订阅者的 node.js 应用程序。 订阅处理程序使用“fetch”调用远程 REST API(等待 fetch(...))。
我尝试处理高频率的消息,REST 调用因远程服务器过载而失败。
重载的发生是因为订阅者 hanler 是异步的。
我的问题是: 有没有一种方法可以确保异步处理程序是序列化的,这样就不会同时调用远程 API 服务器?
克里斯: 我正在使用 kafka-node
这是一个代码示例:
const consumer = new Consumer(this.client, [{ topic: topicKey}]);
consumer.on('message', function (message) {
handleMessage(message)
});
async function handleMessage(message) {
... decode the message
// Send to the Remote server using a REST call
//=> the task is suspended, waiting for the IO, so, meantime, the next message
// is processed, and I flood the remote server of POST requests.
await fetch(...);
}
谢谢。
最佳答案
我不确定你想要实现什么。我知道您的 API 重载了,因为您同时调用了太多 API。
所以,如果我的理解很好,你想同步进行。
正如我在评论中所说,我认为队列是一个不错的选择。这是我的做法(您可能会在其他地方找到更好的方法来实现队列,但我只是给您一个想法 :D)
const consumer = new Consumer(this.client, [{ topic: topicKey}]);
const myQueue = [];
consumer.on('message', function (message) {
myQueue.push(message);
});
async function consumeQueue(){
const message = myQueue.shift();
if(!message){
await sleep(3000);
} else {
// ... decode your message
await fetch(message)
}
consumeQueue();
}
function sleep(ms){
return new Promise(resolve => setTimeout(resolve, ms));
}
// you have to init it :D
consumeQueue();
关于node.js - kafka 消费者和异步处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51823223/