我想编写一个函数,一旦主题中的最后一条消息被读取,该函数将调用回调。
function getCurrentMessages(kafka, topic, cb_done){
// Start consuming from the beginning
var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
consumer.on('message', function(msg){
// Do something with msg
});
consumer.on('final-message-received', function(){
consumer.close(function(){
cb_done();
});
});
}
当前的库可以实现这一点吗?我不想让消费者保持开放状态来接收新消息。
最佳答案
我认为您可以借助offset的帮助来做到这一点。您可以在Consumer端轻松获取事件偏移量的信息。
KAFKA 为您提供以下提到的偏移量:
1. Earlier offset
2. Current Offset and
3. Latest offset
您只需要获取 Kafka 中写入的最新消息或最后一条消息的偏移量,并在消费事件时将其与当前偏移量进行比较,在这种情况下,当您从 kafka 消费最后一条消息时,您将获得等于当前偏移量的最新偏移量。
您可以编写一个 IF 条件来检查
IF(current_offset == latest_offset)
{
Break the flow or the action which you want;
}
如果它不适合您或需要任何进一步信息,请告诉我。
关于javascript - npm kafka- Node : Close consumer after final message read,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32914324/