node.js - 如何在node-rdkafka中一条一条读取消息

标签 node.js apache-kafka librdkafka

我正在使用 node-rdkafka ( https://github.com/Blizzard/node-rdkafka ) 来消费消息,基本设置工作正常,但每次我将某些内容推送到队列时,无论之前的方法是否完成,它都会触发该函数。

我希望在上一个函数完成时触发下一个数据单元。

这是我的实现

const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
    'group.id':'consumer',
    'metadata.broker.list': '*******',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '********',
    'sasl.password': '********',
    'security.protocol': 'SASL_SSL',
    'enable.auto.commit':false
}, {});

// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
    if (err) {
        console.log(`Error connecting to Kafka broker: ${err}`);
        process.exit(-1);
    }

});
let is_pause = false;
consumer.on('ready', (arg)=>{
    console.log('consumer ready.' + JSON.stringify(arg));
    console.log('Consumer is ready');
    consumer.subscribe([topic]);
    setInterval(function() {
        console.log('consumer has consume on :'+timeMs());  
        consumer.consume();
      }, 1000);
});

consumer.on('data',async (data)=>{
    console.log('consumer is consuming data');
    if(!is_pause) {
        is_pause = true;
        if(data && typeof data !== 'undefined') {
            try {
                console.log('consumer received the data');
                consumer.pause([topic]);
                console.log('consumer has pause the consuming');
                await processMessage(data);
                console.log('consumer is resumed');
                consumer.resume([topic]);
                is_pause = false;
            } catch(error) {
                console.log('data consuming error');
                console.log(error);
            }
        } else {
            is_pause = false;
        }
    }
});


最佳答案

您正在调用 consume() (不带任何参数),它会尽快返回消息。

如果你想控制消费速度,可以使用另一个方法consume(size),它返回size条Kafka记录。例如 consume(1) 将返回下一条 Kafka 记录。

请参阅node-rdkafka Consumer docs .

关于node.js - 如何在node-rdkafka中一条一条读取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59928390/

相关文章:

node.js - Nodejs child_process 生成自定义 stdio

kubernetes - 为什么要在 Kubernetes 中为 Kafka 使用 headless 服务,为什么不使用开箱即用的负载平衡集群 IP?

bazel rules_go : linking go binary against a static c++ library (. a file) 由工作区中的另一个目标生成

go - 在 Debian Docker 镜像中使用 librdkafka 构建 Golang 应用程序?

javascript - 正则表达式获取 API 表单 URL 的版本

node.js:如何检测空的标准输入流?

node.js - 缺少架构错误 : Schema hasn't been registered for model "Users" when populate

apache-kafka - 卡夫卡有重复的消息

java - 创建 Kafka Producer 发送列表中的每条消息

c++ - CMake 未找到 librdkafka