java - 在 @KafkaListener 注释方法中使用响应式(Reactive) webflux 代码

标签 java apache-kafka spring-webflux spring-kafka project-reactor

我正在使用 spring-kafka 实现一个从特定主题读取消息的消费者。所有这些消息都由它们处理,并通过 REST API 导出到另一个系统。为此,代码使用了 Spring Webflux 项目中的 WebClient,这导致了响应式代码:

  @KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
  public void listenToTopic(final ConsumerRecord<String, String> record) {
    
    // minimal, non-reactive code here (logging, serizializing the string)

    webClient.get().uri(...).retrieve().bodyToMono(String.class)
       // long, reactive chain here
       .subscribe();
  }

现在我想知道这个设置是否合理,或者这是否会导致很多问题,因为 spring-kafka 的 KafkaListener 逻辑本身并不是 react 性的。我想知道是否有必要改用reactor-kafka。 我对整个 react 世界以及 kafka 世界的理解非常有限,但这是我目前假设上述设置需要的内容:

  1. listenToTopic 函数几乎会立即返回,因为大部分工作是在 react 链中完成的,不会阻止函数返回。这意味着,据我所知,KafkaListener 逻辑将假定消息已在该处得到正确处理,因此它可能会确认它并在某个时候也提交它。如果我理解正确,那么这意味着消息的处理可能会出现问题。当 KafkaListener 已经获取下一条记录时,工作仍然可以在之前的 react 链中完成。这意味着如果应用程序依赖于按严格顺序完全处理的消息,那么上述设置将是错误的。但如果没有,那么上面的设置就可以了吗?
  2. 上述设置的另一个问题是,如果有大量消息传入,应用程序可能会重载工作。因为监听器函数几乎立即返回,大量消息可能正在 react 链内部处理同时。
  3. @KafkaListener 逻辑内置的重试逻辑在这里不会真正起作用,因为 react 链内部的异常不会触发它。任何重试逻辑都必须由监听器函数本身内部的响应式(Reactive)代码处理。
  4. 当使用 reactor-kafka 而不是 @KafkaListener 注释时,可以更改第 1 点中描述的行为。因为监听器现在将集成到 react 链中,所以只有当 react 链实际上已经完成了。这样,据我了解,只有在通过 react 链完全处理一条消息后,才会获取下一条消息。这也可能会解决第 2-4 点中描述的问题/行为。

问题:我对情况的理解是否正确?是否还有我遗漏的此设置可能导致的其他问题?

最佳答案

你的理解是正确的;切换到非响应式(Reactive)休息客户端(例如 RestTemplate)或为消费者使用 reactor-kafka

关于java - 在 @KafkaListener 注释方法中使用响应式(Reactive) webflux 代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68059163/

相关文章:

java - checkstyle 不适用于 linux eclipse luna

apache-kafka - Kafka 中的不公平领导者选举 - 所有分区的领导者相同

java - flink - 测量背压

java - 如何在 Kafka 中使用多个消费者?

kotlin - Flux.collectList() 是否引入阻塞行为

spring-mvc - 使用功能性 Webflux 上传文件

嵌套 Json 对象数组的 Java 模型

java - 什么是NullPointerException,我该如何解决?

spring-security - Spring Security Reactive WebFilterChainProxy 仅调用单个过滤器链

java - 为什么 Jackson 库内部实现利用 getter 调用来生成 Json