node.js - kafka 消费者和异步处理程序

标签 node.js apache-kafka kafka-consumer-api

我有一个带有 Kafka 订阅者的 node.js 应用程序。 订阅处理程序使用“fetch”调用远程 REST API(等待 fetch(...))。

我尝试处理高频率的消息,REST 调用因远程服务器过载而失败。

重载的发生是因为订阅者 hanler 是异步的。

我的问题是: 有没有一种方法可以确保异步处理程序是序列化的,这样就不会同时调用远程 API 服务器?

克里斯: 我正在使用 kafka-node

这是一个代码示例:

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
consumer.on('message', function (message) {
  handleMessage(message)
});

async function handleMessage(message) {
   ... decode the message

  // Send to the Remote server using a REST call

  //=> the task is suspended, waiting for the IO, so, meantime, the next message
  //   is processed, and I flood the remote server of POST requests.
  await fetch(...);
}

谢谢。

最佳答案

我不确定你想要实现什么。我知道您的 API 重载了,因为您同时调用了太多 API。

所以,如果我的理解很好,你想同步进行。

正如我在评论中所说,我认为队列是一个不错的选择。这是我的做法(您可能会在其他地方找到更好的方法来实现队列,但我只是给您一个想法 :D)

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
const myQueue = [];

consumer.on('message', function (message) {
    myQueue.push(message);
});

async function consumeQueue(){
    const message = myQueue.shift();

    if(!message){
        await sleep(3000);
    } else {
        // ... decode your message
        await fetch(message)
    }

    consumeQueue();
}

function sleep(ms){
    return new Promise(resolve => setTimeout(resolve, ms));
}

// you have to init it :D
consumeQueue();

关于node.js - kafka 消费者和异步处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51823223/

相关文章:

javascript - 正文较大 (1.3 mb) 的 Node 发布消息错误 : 413 Request Entity Too Large

mysql - nodejs和mysql使用压缩字符串

java - Apache Kafka 异常 : org. apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V

c# - 如何使用分区以便使用 .NET Core C# 并行使用 kafka 中的一个主题?

java - Sprint启动kafka Consumer无法连接到kafka容器

java - 使用spring kafka在kafka消费者中手动提交偏移量的方法

javascript - Uncaught ReferenceError : require is not defined in Electron BrowserWindow

node.js - 使用 RxJs WebSocketSubject 和 Angular Universal 时出现 "ReferenceError: WebSocket is not defined"

java - 使用 Apache Kakfa 的 KafkaConsumer api 时出现类型错误

apache-kafka - Kafka Streams - 来自具有保留策略的主题的 KTable