spring - 如何获取 KafkaListener 消费者

标签 spring spring-kafka

我将 spring-kafka 用于 springboot 2.0.4.RELEASE。

并使用KafkaListener获取消息

现在我想为我的组重置偏移量

但我不知道如何为该组获取消费者

    @KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFactory")
    public String listenTopic33(List<ConsumerRecord<Integer, String>> record, Acknowledgment ack){
// do something

    }


    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public void test() {

        MessageListenerContainer test3 = kafkaListenerEndpointRegistry.getListenerContainer("test3");
}

最佳答案

如果你想在监听器本身中寻找消费者,只需添加一个 Consumer<?, ?> consumer监听器方法的参数。

请记住,容器可能已经获取了更多消息,因此您将在搜索生效之前获取它们。你可以设置 max.poll.records=1避免这种情况。

您还可以添加自定义 RemainingRecordsErrorHandler到容器,在监听器中抛出异常,错误处理程序将代替监听器获取剩余的记录。

另见 Seeking to a Specific Offset .

In order to seek, your listener must implement ConsumerSeekAware, which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first method is called when the container is started. You should use this callback when seeking at some arbitrary time after initialization. You should save a reference to the callback. If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer), you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You must use the callback argument, not the one passed into registerSeekCallback. This method is never called if you explicitly assign partitions yourself. Use the TopicPartitionInitialOffset in that case.

The callback has the following methods:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToEnd(String topic, int partition);

You can also perform seek operations from onIdleContainer() when an idle container is detected. See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.

To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.

这是一个例子;我们跟踪每个主题/分区的回调...

@SpringBootApplication
public class So56584233Application {

    public static void main(String[] args) {
        SpringApplication.run(So56584233Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(new ProducerRecord<>("so56584233", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56584233", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {


    private static final Logger logger = LoggerFactory.getLogger(Listener.class);


    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "so56584233", topics = "so56584233", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

关于spring - 如何获取 KafkaListener 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56584233/

相关文章:

java - 用Grails 3处理 token

java - Querydsl 与 MongoDB 的使用

Java AS400 "Internal driver error"与控制台应用程序

spring-boot - 尽管出现在受信任的包列表中,但该类不在受信任的包中

java - 卡夫卡 : How to re-consume un committed/not Acknowledged message

java - spring-mvc错误: Neither BindingResult nor plain target object for bean name 'userBean' available as request attribute

java - java注解的执行顺序

spring-boot - 通过使用 Docker-Compose 和 Spring Boot 运行的 IP 地址访问远程主机中的 Kafka

spring-boot - 使用 Spring Boot 创建 Kafka 主题

java - StreamListener内部是否调用MessageChannel.send(Message<?> message)