java - 如何使用 Spring Kafka 实现有状态的消息监听器?

标签 java spring apache-kafka spring-kafka

我想使用 Spring Kafka API 实现一个有状态的监听器。

鉴于以下情况:

  • 一个 ConcurrentKafkaListenerContainerFactory,并发设置为“n”
  • @KafkaListener 注释方法,在 Spring @Service 类上

然后将创建“n”个 KafkaMessageListenerContainers。其中每一个都有自己的 KafkaConsumer,因此会有“n”个消费者线程——每个消费者一个。

当消息被消费时,将使用轮询底层 KafkaConsumer 的同一线程调用 @KafkaListener 方法。由于只有一个监听器实例,因此该监听器需要是线程安全的,因为将有来自“n”个线程的并发访问。

我不想考虑并发访问,而是将状态保存在我知道只会被一个线程访问的监听器中。

如何使用 Spring Kafka API 为每个 Kafka 消费者创建一个单独的监听器?

最佳答案

你是对的;每个容器有一个监听器实例(无论是否配置为 @KafkaListenerMessageListener)。

一种变通方法是使用原型(prototype)作用域 MessageListener 和 n 个 KafkaMessageListenerContainer bean(每个有 1 个线程)。

然后,每个容器都会获得自己的监听器实例。

@KafkaListener POJO 抽象是不可能的。

不过,通常使用无状态 bean 会更好。

编辑

我发现了另一种使用 SimpleThreadScope 的解决方法...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(容器并发为 3)。

它工作正常:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

唯一的问题是范围不会自行清理(例如,当容器停止并且线程消失时。这可能并不重要,具体取决于您的用例。

要解决这个问题,我们需要容器的一些帮助(例如,在监听器线程停止时发布一个事件)。 GH-762 .

关于java - 如何使用 Spring Kafka 实现有状态的消息监听器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51658210/

相关文章:

java - 当用户按下退出键时,如何中断线程并要求其完成工作?

spring - 如何配置Apache FtpServer?

c - 用C发送二进制结构到kafka

apache-kafka - Kafka GlobalKTable 延迟问题

java - 如何在Kafka中使用多线程

java - 为什么 MediaRecorder 延迟开始录制?

java - 具有附加约束的 @OneToOne 关系

java - 无法弄清楚为什么我在计算中得到空值

java - 使用 ENUM 作为配置文件,但需要一个字符串

java - JSP 表达式语言错误