javascript - 如何为kafka主题创建消费者?

标签 javascript node.js apache-kafka producer-consumer

我在kafka服务器上创建了主题,现在我正在创建消费者来从服务器读取主题消息,但是当我尝试使用consumer.on('message')消费消息时,我没有看到任何数据。 ,知道下面的代码中实现了什么错误吗,我需要设置偏移量吗?

消费者.js

var kafka = require('kafka-node');
var config = require('./config.js');
var zk = require('node-zookeeper-client');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;

function start() {
    topics = [{
        topic: 'test-1'
    }];
    var groupId = 'push';
    var clientId = "consumer-" + Math.floor(Math.random() * 10000);
    var options = {
        autoCommit: true,
        fetchMaxWaitMs: 100,
        fetchMaxBytes: 10 * 1024 * 1024,
        groupId: groupId
    };
    console.log("Started consumer: ", clientId);
    var consumer_client = new kafka.Client(kafkaConn, clientId);
    var client = new Client(consumer_client.connectionString, clientId);
    var consumer = new HighLevelConsumer(client, topics, options);
    consumer.on('message', function(message) {
        var topic = message.topic;
        console.log('Message', topic);
    });

};

start();

最佳答案

const Kafka = require("node-rdkafka");

const kafkaConf = {
  "group.id": "cloudkarafka-example",
  "metadata.broker.list": ["localhost:9092"],
  "socket.keepalive.enable": true,
  //"security.protocol": "SASL_SSL",
  //"sasl.mechanisms": "SCRAM-SHA-256",
  //"sasl.username": process.env.CLOUDKARAFKA_USERNAME,
  //"sasl.password": process.env.CLOUDKARAFKA_PASSWORD,
  "debug": "generic,broker,security",
  'enable.auto.commit': false,
};

//const prefix = process.env.CLOUDKARAFKA_USERNAME;
const topics = ['topicName'];
const consumer = new Kafka.KafkaConsumer(kafkaConf, {
  "auto.offset.reset": "beginning"
});

consumer.on("error", function(err) {
  console.error(err);
});
consumer.on("ready", function(arg) {
  console.log(`Consumer ${arg.name} ready`);
  consumer.subscribe(topics);
  consumer.consume();
});

consumer.on("data", function(m) {
 console.log(m.value.toString());
});
consumer.on("disconnected", function(arg) {
  process.exit();
});
consumer.on('event.error', function(err) {
  console.error(err);
  process.exit(1);
});
consumer.on('event.log', function(log) {
  console.log(log);
});
consumer.connect();

关于javascript - 如何为kafka主题创建消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37977605/

相关文章:

node.js - Express 4 Handlebars 无布局渲染

Node.js 在 url 上隐藏 html 名称

apache-kafka - 分区重新分配在 Kafka 1.1.0 中失败

java - 如何使 java 6 兼容 Kafka?

javascript - MathJax SVG 看起来很差

javascript - 为什么 Meteor Router 函数总是运行两次

node.js - 通天塔 : "The keyword ' await' is reserved (53:24)"

javascript - 如何使用kafka-node从主题读取数据?

javascript - 为什么 Babel 不参与我的 React Native 项目?

javascript - 更改 Wordpress 网站下拉框中的元素