node.js - kafka-node 消费者收到 offsetOutOfRange 错误

标签 node.js apache-kafka kafka-consumer-api

我正在使用 kafka-node(kafka 的 Node 客户端),使用消费者检索有关主题的消息。不幸的是,我收到了“offsetOutOfRange”条件(调用了 offsetOutOfRange 回调)。我的应用程序运行良好,直到消费者明显落后于生产者,在最早和最新的偏移量之间留下了很大的差距。在这一点上,我(也许是错误的)假设消费者将能够继续接收消息(并希望 catch 生产者)。

我的kafka消费者客户端代码如下:

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );

consumer.on('error', [error callback...]);

consumer.on('offsetOutOfRange', [offset error callback...]);

consumer.on('message', [message callback...]);
:
:

我是不是做错了什么,还是遗漏了什么?

如果没有,我有几个问题:

(a) 是否有一种公认的“最佳”方式来编写客户端以优雅地处理这种情况?

(b) 为什么会提出这个条件? (我假设客户端应该能够从中断的地方继续阅读消息,最终(理想情况下) catch ...)

(c) 我是否需要编写代码/逻辑来处理这种情况,并显式重新定位要读取的消费者偏移量? (这似乎有点麻烦)...

感谢任何帮助。

最佳答案

我认为该应用可能会尝试读取 Kafka 中不再可用的消息。 Kafka 根据 log.retention.* 属性删除旧消息。假设您已向 Kafka 发送了 1000 条消息。由于保留,Kafka 删除了前 500 条消息。如果您的应用程序尝试读取消息 350,它将失败并引发 offsetOutOfRange 错误。这可能是因为您的消费者速度太慢,以至于 Kafka 代理在您的消费者可以处理消息之前就已经删除了消息。或者您的消费者崩溃了,但最后处理的消息的偏移量保存在某处。

您可以使用 Offset class检索最新/最早的可用偏移量(请参阅方法 fetch)并更新消费者的偏移量。我们使用这种方法。

一般来说,当这种情况发生时,很难判断应用程序应该做什么,因为很明显有些地方出了问题。

希望对你有帮助 卢卡什

关于node.js - kafka-node 消费者收到 offsetOutOfRange 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32191582/

相关文章:

node.js - 如何在一定时间内阻止api重复请求

java - GlobalKTable - StreamsException : Encountered a topic-partition not associated with any global state store

apache-kafka - 尝试删除Kafka中的消费者组时出现GroupNotEmptyException

javascript - kafka-node 有没有办法在崩溃后请求收到的消息?

apache-kafka - 信息客户端/127.0.0.1 :48452 which had sessionid 0x15698f5ac360001 (org. apache.zookeeper.server.NIOServerCnxn 的已关闭套接字连接)

node.js - 如何使用 ChainCode Mock Stub 测试 Node js 链码

javascript - 更改数据库后如何更新angularjs View

node.js - 在这种情况下,Redis 是否会出现单点故障(Redis、Node.JS、Socket.IO)?

java - 如何创建Kafka消费者库来消费多个主题

apache-kafka - kafka中的Bootstrap服务器与zookeeper?