java - spring-kafka中多个KafkaListener实现共享逻辑

标签 java spring spring-boot apache-kafka spring-kafka

我的 Spring Boot 应用程序包含几个 @KafkaListeners ,并且每个监听器在实际处理有效载荷之前和之后执行相同的步骤:验证有效载荷,检查事件是否已经被处理,检查它是否是逻辑删除(空)消息,决定是否应该在失败的情况下重试处理,发出指标等。

这些步骤目前是在一个基类中实现的,但是由于传递给@KafkaListener的主题在运行时必须是常量,所以在子类中定义了@KafkaListener注解的方法,除了将其参数传递给基类中的方法。

这很好用,但我想知道是否有更优雅的解决方案。我假设我的基类必须以编程方式创建一个监听器容器,但在快速查看 KafkaListenerAnnotationBeanPostProcessor 之后, 它似乎很复杂。

有没有人有任何推荐?

最佳答案

在寻求实现类似的东西时偶然发现了这个问题,我首先从 Artem Bilan 的 answer 着手。 .但是,这不起作用,因为默认情况下,注释不会在子类中继承,除非它们本身使用 @Inherited 进行注释。尽管如此,可能仍然有一种方法可以使注释方法起作用,如果我让它起作用,我会更新这个答案。值得庆幸的是,尽管我已经使用 Kafka 监听器的程序注册实现了预期的行为。

我的代码是这样的:

接口(interface):

public interface GenericKafkaListener {

  String METHOD = "handleMessage";

  void handleMessage(ConsumerRecord<String, String> record);
}

抽象类:

public abstract class AbstractGenericKafkaListener implements GenericKafkaListener {

  private final String kafkaTopic;

  public AbstractGenericKafkaListener(final String kafkaTopic) {
      this.kafakTopic = kafkaTopic;
  }

  @Override
  public void handleMessage(final ConsumerRecord<String, String> record) {
      //do common logic here
      specificLogic(record);
  }

  protected abstract specificLogic(ConsumerRecord<String, String> record);

  public String getKafkaTopic() {
      return kafkaTopic;
  }
} 

然后我们可以通过编程方式在 KafkaListenerConfigurer 中注册所有类型为 AbstractGenericKafkaListener 的 bean:

@Configuration
public class KafkaListenerConfigurataion implements KafkaListenerConfigurer {

  @Autowired
  private final List<AbstractGenericKafkaListener> listeners;

  @Autowired
  private final BeanFactory beanFactory;

  @Autowired
  private final MessageHandlerMethodFactory messageHandlerMethodFactory;

  @Autowired
  private final KafkaListenerContainerFactory kafkaListenerContainerFactory;

  @Value("${your.kafka.consumer.group-id}")
  private String consumerGroup;

  @Value("${your.application.name}")
  private String service;

  @Override
  public void configureKafkaListeners(
    final KafkaListenerEndpointRegistrar registrar) {

      final Method listenerMethod = lookUpMethod();

      listeners.forEach(listener -> {
        registerListenerEndpoint(listener, listenerMethod, registrar);
    });
  }

  private void registerListenerEndpoint(final AbstractGenericKafkaListener listener,
    final Method listenerMethod,
    final KafkaListenerEndpointRegistrar registrar) {

      log.info("Registering {} endpoint on topic {}", listener.getClass(),
        listener.getKafkaTopic());

      final MethodKafkaListenerEndpoint<String, String> endpoint =
        createListenerEndpoint(listener, listenerMethod);
      registrar.registerEndpoint(endpoint);
  }

  private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(
    final AbstractGenericKafkaListener listener, final Method listenerMethod) {

      final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
      endpoint.setBeanFactory(beanFactory);
      endpoint.setBean(listener);
      endpoint.setMethod(listenerMethod);
      endpoint.setId(service + "-" + listener.getKafkaTopic());
      endpoint.setGroup(consumerGroup);
      endpoint.setTopics(listener.getKafkaTopic());
      endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

      return endpoint;
  }

  private Method lookUpMethod() {
      return Arrays.stream(GenericKafkaListener.class.getMethods())
        .filter(m -> m.getName().equals(GenericKafkaListener.METHOD))
        .findAny()
        .orElseThrow(() ->
            new IllegalStateException("Could not find method " + GenericKafkaListener.METHOD));
  }
}

关于java - spring-kafka中多个KafkaListener实现共享逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50325766/

相关文章:

java - 我如何使用 XStream 为简单 map 生成较小的输出?

spring - 无法使 EnableOauth2Sso 工作 - BadCredentialsException : Could not obtain access token

spring-boot - 在 Kotlin 中迁移到版本 3+ 时,Spring Boot 无法再绑定(bind)配置属性

java - 如何优化周期?

java - Gradle 排除依赖关系极其困惑

java - 模拟@AuthenticationPrincipal 参数

java - 对对象 volatile 变量的正确实例的引用

java - 如何巧妙地连接两个字符串以忽略重复的子字符串

java - 在 IBM bluemix 中使用空间数据

eclipse - Tomcat 没有启动