node.js - Nodejs kafka消费者无限循环

标签 node.js apache-kafka kafka-consumer-api mq

我正在 ubuntu 16.04 机器上运行 kafka_2.11-2.0.0。创建一个主题并从命令行界面向其生成一些消息。

enter image description here

并从命令行启动消费者,它消耗得很好。

enter image description here

但是当我像下面这样启动nodejs消费者时,它正在无限迭代。我的客户端代码中是否缺少任何内容?

var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(
    client,
    [
        {topic: 'mytopic', partition: 0}
    ],
    {
        autoCommit: true
    }
);
consumer.on('message', function (message) {
        console.log(message);
});
consumer.on('error', function (err){
        console.log(err);

})
consumer.on('offsetOutOfRange', function (err){
        console.log(err);
        process.exit();
})

这是输出。

{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }

最佳答案

终于发现kafka新版本2.0.0的问题。所以我转移到以前的版本,它现在可以工作了。

关于node.js - Nodejs kafka消费者无限循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52118302/

相关文章:

mysql - 无法使用 Apache Kafka 和 Debezium 从联合 MySQL 表中读取数据

apache-storm - 将storm的字数统计拓扑与kafka集成

node.js - 如何为部署到 Heroku 的 Node +mongo 应用程序播种数据?

javascript - 如何为我的应用程序使用带有 `npm start` 的 Node 检查器?

java - Kafka Streams 在分组和聚合时转换为 KTable 的字符串问题

java - 无法从所有分区获取 Kafka 滞后

spring - 如何按类型消费来自Kafka的消息

java - 如何获取 Flink 中一条记录的 Kafka 时间戳?

node.js - 一个字段在 Mongoose 模式中可以有哪些选项?

node.js - API - 聚合应该在前端还是后端应用程序中完成?