java - Kafka 消费者 - 暂停对特定 Kafka 主题分区的事件轮询,以将其用作延迟队列

标签 java spring-boot apache-kafka kafka-consumer-api spring-kafka

我们的系统中有一个场景,其中 kafka 主题 XYZ 用户详细信息由其他某个生产应用程序 A(不同的系统)发布,而我的应用程序 B 正在使用该主题。

要求是应用程序 B 需要在 A 将事件放入 kafka 主题 XYZ 后 45 分钟(或任何可配置的时间)使用该事件(此延迟的原因是某些应用程序的另一个 REST api系统 C 需要根据特定用户的此用户详细信息事件触发,以确认是否为该用户设置了某些标志,并且可以在 45 分钟持续时间内的任何点设置该标志,尽管如果 C 没有这样做,则可以解决该问题有能力发布到 kafka 或以任何方式通知我们)。

我们的应用程序 B 是在 spring 中编写的。

我尝试的解决方案是从 Kafka 获取事件并检查队列中第一个事件的时间戳,如果该事件已经是 45 分钟,则处理它,或者如果小于 45 分钟,则暂停轮询 kafka 容器使用 MessageListnerContainer pause() 直到达到 45 分钟为止的时间方法。 如下所示 -

@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
        public void delayedConsumer(@Payload  String message,
                                    Acknowledgment acknowledgment) {

            UserDataEvent userDataEvent = null;
            try {
                 userDataEvent = this.mapper.readValue(message, TopicRequest.class);
            } catch (JsonProcessingException e) {
                logger.error("error while parsing message");
            }
            MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
            if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
 {
                long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
                // give negative ack to put already polled messages back to kafka topic
                acknowledgment.nack(1000);
                // pause container, and later resume it  
                delayedContainer.pause();
                ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.schedule(() -> {
                    delayedContainer.resume();
                }, sleepTimeForPolling, TimeUnit.MILLISECONDS);
                return;
            }
            // if message was already 45 minutes old then process it
            this.service.processMessage(userDataEvent);
            acknowledgment.acknowledge();
        }

虽然它适用于单个分区,但我不确定这是否是正确的方法,对此有什么评论吗?我还看到多个分区会导致问题,如上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器由于其他分区中的新消息而暂停,则不会消耗该消息。 我可以在分区级别使用此暂停逻辑吗?

在这种情况下,有什么更好/推荐的解决方案可以在一定的可配置时间后实现这种延迟处理,而不是像上面那样做?

最佳答案

Kafka 并不是真正为此类场景设计的。

我认为该技术有效的一种方法是将容器并发设置为与主题中的分区数量相同,以便每个分区由不同线程上的不同使用者处理;然后暂停/恢复个人 Consumer<?, ?> s 而不是整个容器。

为此,请添加 Consumer<?, ?>作为附加参数;要恢复消费者,请设置 idleEventInterval并检查事件监听器中的计时器 ( ListenerContainerIdleEvent )。 Consumer<?, ?>是事件的属性,因此您可以调用 resume()那里。

关于java - Kafka 消费者 - 暂停对特定 Kafka 主题分区的事件轮询,以将其用作延迟队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60055095/

相关文章:

尝试删除形状时出现 JavaFX 错误

java - 如何在 JPanel 和 JFrame 范围内绘图?

java - 可以为每个 SpringbootTest 注解加载特定的 data.sql

java - 创建名称为 'projectingArgumentResolverBeanPostProcessor' 的 bean 时出错

scala - kafka-clients scala 库如何管理 TCP 连接?

java - Android 模拟器运行不正常

java - 如何覆盖Spring Boot属性?

java - 无法理解 Spring Boot 代码流程

java - Kafka 使用 Sink Connector 进行 google Pub/Sub

apache-kafka - 当用于存储所有业务事件时,如何为 Kafka 建模主题和分区?