javascript - npm kafka- Node : Close consumer after final message read

标签 javascript node.js apache-kafka

我想编写一个函数,一旦主题中的最后一条消息被读取,该函数将调用回调。

function getCurrentMessages(kafka, topic, cb_done){
  // Start consuming from the beginning
  var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
  consumer.on('message', function(msg){
    // Do something with msg
  });
  consumer.on('final-message-received', function(){
    consumer.close(function(){
      cb_done();
    });
  });
}

当前的库可以实现这一点吗?我不想让消费者保持开放状态来接收新消息。

最佳答案

我认为您可以借助offset的帮助来做到这一点。您可以在Consumer端轻松获取事件偏移量的信息。
KAFKA 为您提供以下提到的偏移量:

1. Earlier offset
2. Current Offset and
3. Latest offset

您只需要获取 Kafka 中写入的最新消息或最后一条消息的偏移量,并在消费事件时将其与当前偏移量进行比较,在这种情况下,当您从 kafka 消费最后一条消息时,您将获得等于当前偏移量的最新偏移量。
您可以编写一个 IF 条件来检查

IF(current_offset == latest_offset)
{
Break the flow or the action which you want;
}

如果它不适合您或需要任何进一步信息,请告诉我。

关于javascript - npm kafka- Node : Close consumer after final message read,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32914324/

相关文章:

javascript - 获取构建错误 "ERROR in NgSemanticModule is not an NgModule "即使在添加依赖项并导入到邮件模块之后

javascript - 运算符 '+' 不能应用于类型 'Number' 和 '1'

javascript - 使用插件压缩 webpack 包

node.js - session ID 显示为未定义的 node.js

java - Kafka Connect - 文件源连接器错误

azure - Mapr Streams to Azure 的 SSL 握手错误

javascript - django 中的 recaptcha 刷新按钮

javascript - React下拉菜单的jss样式

node.js - 未传递 url 参数时不会调用快速路由

java - Kafka Spark Streaming LocationStrategies java类def未找到异常