我正在 ubuntu 16.04 机器上运行 kafka_2.11-2.0.0。创建一个主题并从命令行界面向其生成一些消息。
并从命令行启动消费者,它消耗得很好。
但是当我像下面这样启动nodejs消费者时,它正在无限迭代。我的客户端代码中是否缺少任何内容?
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(
client,
[
{topic: 'mytopic', partition: 0}
],
{
autoCommit: true
}
);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err){
console.log(err);
})
consumer.on('offsetOutOfRange', function (err){
console.log(err);
process.exit();
})
这是输出。
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
{ topic: 'mytopic',
value: 'message2',
offset: 1,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: 'message3',
offset: 2,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
{ topic: 'mytopic',
value: 'message2',
offset: 1,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: 'message3',
offset: 2,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
{ topic: 'mytopic',
value: 'message2',
offset: 1,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: 'message3',
offset: 2,
partition: 0,
highWaterOffset: 3,
key: null }
{ topic: 'mytopic',
value: '',
offset: 0,
partition: 0,
highWaterOffset: 3,
key: '' }
最佳答案
终于发现kafka新版本2.0.0的问题。所以我转移到以前的版本,它现在可以工作了。
关于node.js - Nodejs kafka消费者无限循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52118302/