java - Kafka主题以及本地数据库的Kafka偏移量管理

标签 java apache-kafka kafka-consumer-api spring-kafka

我想管理 Kafka 主题和数据库中的偏移量,这样如果我想在某个点之后在队列中重新处理,我就可以。我该怎么做?提前致谢。

最佳答案

给定 PartitionInfo您应该能够告诉您的消费者 seekToBeginningseek 到该分区中的偏移量。

一个ConsumerRecord知道它的主题、分区和偏移量。您可以将这些事实记录在数据库中。

但这里的问题是您的主题是否已分区。然后,您的数据将按该类别的时间顺序排列。因此,如果您有两个分区并且基本上按姓氏分区,则字母表前半部分的名称更改将是连续的,后半部分将是连续的,但如何获取名称更改的单个时间顺序 View 并不明显整个系统。

但是,如果您记录了数据库中特定更改的分区和偏移量,则可以查找该分区和偏移量并从该点重新处理流。

(如果您只有一个分区,这一点就无关紧要,但当您的主题或流式架构需要多个分区时,需要考虑这一点)

从实际问题回到理论,我不太确定你为什么要这样做,因为消费者团体将记录你对 Kafka 本身的 promise 偏移量,因此,如果你的流处理应用程序崩溃,你将能够毫无顾虑地从上次停下的地方继续。如果您设置 enable.auto.commit property,则此消息提交会自动发生。 ,或者如果您在使用者上调用 commitSync() ,您也可以手动控制它。或者您尝试使用不可变数据存储(Kafka),就像使用可变存储一样,但这只是一种纯粹的猜测,因为您并没有真正描述为什么要做您想做的事情要做的事。

关于java - Kafka主题以及本地数据库的Kafka偏移量管理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47325643/

相关文章:

apache-spark - 如何在 Spark Kafka 直接流中手动提交偏移量?

apache-kafka - 出现错误 :Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-1

java - 使用 JAVA 向 LDAP 中的现有用户添加属性

javascript - 如何向 GET 请求发送一个巨大的参数列表

java - Log4J2 - 如何禁用单元测试中的日志记录?

apache-kafka - Kafka Streams - 缺少源主题

apache-kafka - Kafka Consumer 在处理消息时重试次数有限

Golang Kafka 不消耗所有消息 offsetnewest

Rust 声明先分配后模式

java - Maven 构建跳过清理和验证阶段