java - 如何在不同线程中处理@KafkaListener方法?

标签 java spring-boot apache-kafka kafka-consumer-api spring-kafka

我在 Spring Boot 中有 kafka 处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

例如生产者每秒发送一条消息。但是 myService.processResponse 工作 10 秒。我需要处理每条消息并在新线程中启动 myService.processResponse 。我可以创建我的执行者并将每个响应委托(delegate)给它。但我认为 kafka 中还有其他配置可供使用。我找到了2:

1) 将 concurrency = "5" 添加到 @KafkaListener 注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:

2) 我可以创建 ConcurrentKafkaListenerContainerFactory 并将其设置为 ConsumerFactoryconcurrency

我不明白这些方法之间的区别?只需将 concurrency = "5" 添加到 @KafkaListener 注释就足够了,还是我需要创建 ConcurrentKafkaListenerContainerFactory

或者我根本不明白任何事情,还有其他方法吗?

最佳答案

在管理已提交的偏移量方面,使用执行器会使事情变得复杂;不推荐。

使用@KafkaListener,框架会为您创建一个ConcurrentKafkaListenerContainerFactory

注释上的

concurrency 只是为了方便;它会覆盖出厂设置。

这允许您将同一个工厂与多个监听器一起使用,每个监听器具有不同的并发性。

您可以使用启动属性设置容器并发(默认);该值被注释值覆盖;请参阅 javadocs...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

关于java - 如何在不同线程中处理@KafkaListener方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55241976/

相关文章:

java - 设计建议,未能将工厂与泛型结合起来

java - 通过线路输入或麦克风输入以 Java 进行连续音频录制

java - 使用容器管理的实体bean时,容器何时将bean存储到数据库中?

mongodb - Spring Boot连接到Mongo容器运行的MongoDB副本集

java - 忽略属性占位符 - Spring Boot

apache-kafka - Apache Flink State Store 与 Kafka Streams

java - 如何高效地完全展开一个大的JTree

database - 用Spring Boot创建h2数据库,如果不存在则不要删除。桌面应用程序

java - Kafka消费者输出的数据不一致

java - 调试自定义 Kafka 连接器的简单有效方法是什么?