我正在使用 KafkaMessageListenerContainer 从 kafka 主题中进行消费,我有一个应用程序逻辑来处理每个依赖于其他微服务的记录。我现在在处理每条记录后手动提交偏移量。
但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。
KafkaMessageListenerContainer 是否可以做到这一点?
最佳答案
见 Seeking to a Specific Offset .
In order to seek, your listener must implement
ConsumerSeekAware
which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a
ConcurrentMessageListenerContainer
) you should store the callback in aThreadLocal
or some other structure keyed by the listener Thread.
关于apache-kafka - Spring kafka 消费者,在运行时寻求偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42996882/