apache-kafka - Spring kafka 消费者,在运行时寻求偏移量?

标签 apache-kafka kafka-consumer-api spring-kafka

我正在使用 KafkaMessageListenerContainer 从 kafka 主题中进行消费,我有一个应用程序逻辑来处理每个依赖于其他微服务的记录。我现在在处理每条记录后手动提交偏移量。

但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。

KafkaMessageListenerContainer 是否可以做到这一点?

最佳答案

Seeking to a Specific Offset .

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

关于apache-kafka - Spring kafka 消费者,在运行时寻求偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42996882/

相关文章:

java - 在 REST 端点中流式传输 Kafka 消息

hadoop - 如果我想使用kafka从API获取一些数据。有哪些步骤?

apache-kafka - Storm - 有条件地消耗来自 kafka spout 的流?

java - 在kafka中消费批处理时如何部分提交同步

java - 如何找到消费者组从kafka topic消费数据的开始时间

java - kafka 0.8.2.2有spring支持吗?

java - Spring Boot kafka junit 失败

java - 使用java的play框架中的Kafka Consumer

apache-kafka - BigQuery 以 Kafka 作为源

apache-kafka - Kafka 消费者路径不得以/字符结尾