我只是 Kafka 的新手,我有一个问题:
我在 Kafka 中有主题“A”,我启动 Spring boot 应用程序并使用 MessageChannel 向主题“A”发送一些消息,然后停止应用程序。
当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(不是所有消息)?我搜索了所有解决方案,但它们对我没有多大帮助,如果我只发送新消息,它总是立即收到消息。如果您有可运行的代码,请分享,我非常感激:(
// Start application
// Get latest message in topic 'A' then do some LOGIC
if (exist latest message) {
//Print latest message
}
最佳答案
您的消费者存储他们的偏移量(即最后读取的位置)。重新启动后,他们从此时继续阅读。此行为是设计使然。
当由于某种原因尚不知道该特定消费者组的偏移量(例如,它是一个新的消费者组或偏移量已过期)时,将使用偏移量重置属性,但通常仍然有两个选项 - a) 重新读取一切从头开始 b) 开始监听新消息,忘记之前的一切。
有一些hacky方法可以实现您所描述的目的,但它们并不简单,也不推荐(一个简单的方法:消费者消息,只是跳过它们,直到到达分区 EOF)
也许仅仅 Kafka 并不是解决这个问题的正确工具。
关于java - 如何从 Kafka 主题检索最新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54180946/