javascript - 使用 kafka-node 创建的消费者和生产者无法连接到 kafka 的工作实例

标签 javascript node.js apache-kafka

我尝试通过 kafka-node npm 库使用消费者和生产者,以便通过 nodejs 使用 kafka 作为消息服务。

问题是,即使我的生产者有时工作,消费者总是给我超时错误,或者它只是在无限循环中释放尝试连接到 kafka,即使 kafka 工作正常。

当 Kafka 在远程 centos7 机器上时,我在我的 windows 机器上使用 kafka-node。 即使我将所有代码(消费者和生产者)与 kafka 放在同一台机器上(认为 mabye windows 是问题的一部分),奇怪的行为仍然存在。

我尝试在生产者控制台内置的 kafka 中发送消息,但我的消费者似乎仍然没有响应主题并收到消息。

这是我的简单生产者代码:

const kclient = new kafka.KafkaClient({kafkaHost:'ADDR:9092'});
kclient.on('error',(err) => {
    console.log(err)
})
kprod = new producer (kclient);
// kconsumer = new consumer(kclient);
kprod.on('error',(err) => {
    console.log(`error: ${err}`);
})

kprod.on('ready',() => {
    console.log(`connected to kafka`);
    let tranNumSentToKafka = 0
    for (let index = 0; index < transArray.length; index++) {
        const element = JSON.stringify(transArray[index])
        console.log(`sending data to kafka`);
        kprod.send([{
            topic:'test',
            messages:element
        }],
            (err,data) => { 
            if(err){console.error(err)}
            else{
                tranNumSentToKafka += 1
                console.log(`data sent: ${JSON.stringify(data)}`);
                console.log(`sent ${tranNumSentToKafka} transactions to kafka`);
            }
        })
    }
})

当我运行它时,有时会出现超时错误,如下所示:

{ TimeoutError: Request timed out after 30000ms
    at new TimeoutError (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\kafkaClient.js:1007:14)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) message: 'Request timed out
after 30000ms' }

但有时它会起作用,它会给我以下输出:

...
sent 96 transactions to kafka
data sent: {"test":{"0":15740}}
sent 97 transactions to kafka
data sent: {"test":{"0":15741}}
sent 98 transactions to kafka
data sent: {"test":{"0":15742}}
sent 99 transactions to kafka
data sent: {"test":{"0":15743}}
sent 100 transactions to kafka

这是我的简单消费者:

kafka = require('kafka-node'),
producer = kafka.Producer,
consumer = kafka.Consumer;

const kclient = new kafka.KafkaClient({
    kafkaHost:'10.0.0.55:9092'
    // kafkaHost:'35.186.191.135:9092'
});
kclient.on('ready',() => {
    console.log(`kclient ready`);
    kconsumer = new consumer(kclient,[{
        topic:'test',
        partition:0
    }]);
    kconsumer.on('error',(err) => {
        console.error(` in kconsumer: \n${err}\n`)
    })
    kconsumer.on('ready',() => {
        console.log(`kconsumer ready`);
        kconsumer.on('message',(msg) => {
            console.log(`recived msg: ${msg}`);
        })

    })

})
kclient.on('error',(err) => {
    console.error(`err in kclient: \n${err}\n`)
})

当我在我的 windows 机器上运行它时,我得到:

kclient ready
 in kconsumer:
TimeoutError: Request timed out after 30000ms

当我在centos机器上运行消费者时,我没有得到任何错误,只是卡住了:

kclient ready

永远不会“kconsumer ready”。

在 DEBUG 模式下运行两者只显示:

...
  kafka-node:KafkaClient kafka-node-client reconnecting to ADDR:9092 +1s
  kafka-node:KafkaClient kafka-node-client createBroker ADDR:9092 +2ms
  kafka-node:Consumer connection closed +1s
  kafka-node:KafkaClient kafka-node-client socket closed ADDR:9092 (hadError: true) +3ms
...

当我测试 kafka 和 zookeeper 在我的 centos 机器上是否一切正常时,我使用生产者/消费者控制台对其进行了测试。

另外,我检查了 zookeeper 和 kafka 的日志,没有错误,但没有迹象表明任何生产者或消费者已连接,或发送任何以太消息。

有没有人遇到过kafka-node库的这个问题?

有没有人找到解决方案?

最佳答案

我不确定它是否符合确切的情况,但我有类似的问题行为。

要使用 Kafka,您的客户端应该可以访问代理(具有 ADDR:9092 的代理)和 kafka 数据 Node 。

当客户端连接到代理时,它说“我需要主题 XXX”。 经纪人回答“主题 XXX 在 Node ADDR-YYY 上” 客户端尝试连接 ADDR-YYY

可能的问题是代理返回 kafka Node 的地址,您的消费者机器无法访问该地址。它可以被阻止(例如,您没有公开特定端口),也可以从名称无法访问(例如,损坏返回本地特定地址,如 kafka-node-1.localhost,什么时候应该是一些原始 IP 或其他东西)。

关于javascript - 使用 kafka-node 创建的消费者和生产者无法连接到 kafka 的工作实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56667509/

相关文章:

javascript - 如何将 Angular 图表与我的信息结合使用?

javascript - 如何检查用户是否是admin?

javascript - 断开连接时更新客户端中的用户列表

python - kafka-python 引发 kafka.errors.ConsumerFetchSizeTooSmall

apache-kafka - 如何仅在窗口完成时输出窗口聚合的结果?

java - 集团属性在Spring Stream Cloud中不起作用?

javascript - 禁用提交按钮并显示消息

javascript - 如何从传递有效负载的组件设置 Redux 中的状态?

node.js - golang tcp socket - 处理多条消息

javascript - jQuery 验证插件正则表达式不区分大小写