java - 如何从 Kafka 主题检索最新消息

标签 java apache-kafka

我只是 Kafka 的新手,我有一个问题:

我在 Kafka 中有主题“A”,我启动 Spring boot 应用程序并使用 MessageChannel 向主题“A”发送一些消息,然后停止应用程序。

当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(不是所有消息)?我搜索了所有解决方案,但它们对我没有多大帮助,如果我只发送新消息,它总是立即收到消息。如果您有可运行的代码,请分享,我非常感激:(

    // Start application


    // Get latest message in topic 'A' then do some LOGIC
    if (exist latest message) {
          //Print latest message
    }

最佳答案

您的消费者存储他们的偏移量(即最后读取的位置)。重新启动后,他们从此时继续阅读。此行为是设计使然。

当由于某种原因尚不知道该特定消费者组的偏移量(例如,它是一个新的消费者组或偏移量已过期)时,将使用偏移量重置属性,但通常仍然有两个选项 - a) 重新读取一切从头开始 b) 开始监听新消息,忘记之前的一切。

有一些hacky方法可以实现您所描述的目的,但它们并不简单,也不推荐(一个简单的方法:消费者消息,只是跳过它们,直到到达分区 EOF)

也许仅仅 Kafka 并不是解决这个问题的正确工具。

关于java - 如何从 Kafka 主题检索最新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54180946/

相关文章:

authentication - 基于客户端证书 DN 或其部分的 Kafka ACL to topic

java - 如何强制控制台输出更大的数字

Spring Cloud Data Flow 流无法部署在 Kubernetes 中

java - Apache Kafka 生产者如何并行地将记录发送到分区?

cassandra - 恰好一次和至少一次保证之间的区别

mongodb - 卡夫卡接收器连接器中无效JSON的错误处理

java - 数组上的算法优化

java - 使用 %h 格式化整数并添加前导零

java - 当 Java if 语句同时具有赋值和相等性检查 OR - d 时,它如何工作?

java - 以编程方式从模式中获取类 (Apache avro)