reactive-programming - 如何以一种非常通用的方式将有效负载 react 性地发送到 Kafka 主题?

标签 reactive-programming spring-webflux spring-cloud-stream project-reactor

我们的代码是完全响应式(Reactive)的,使用 Webflux。我们想将消息发送到 Kafka 队列,并希望响应式地进行。

我看到有一个 spring-cloud-stream-reactive 依赖项支持 Reactive 功能,但用于数据流的特定用例。

Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows.

我知道这里有一种方法可以做到:https://projectreactor.io/docs/kafka/release/reference/

但它看起来像是与 kafka 特定库的紧密耦合。我喜欢这里的松散耦合。

有没有办法让我的代码不耦合到 kafka 库?

最佳答案

因此,在 spring-cloud-stream 2.x 中,我们逐渐弃用响应式模块,取而代之的是 spring-cloud-function它提供开箱即用的响应式支持。这是一个更简单且耦合度更低的编程模型。这里有点more details .

基本上在上面链接的示例中,您需要的只是修改函数本身

@Bean
public Function<Flux<String>, Flux<String>> toUpperCase() {
    return flux -> flux.map(value -> value.toUpperCase());
}

Spring Cloud Stream 和 Spring Cloud Function 将负责订阅、绑定(bind)等。 这是关于 functional programming model 的更多信息在 Spring Cloud Stream 中。

关于reactive-programming - 如何以一种非常通用的方式将有效负载 react 性地发送到 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55278630/

相关文章:

java - 如何在Java中的多个线程之间共享公共(public)日志文件

spring-webflux - 如何使用spring webflux读取请求正文

spring-cloud-netflix - 如何将Hystrix与Spring WebFlux WebClient一起使用?

java - Spring Cloud Streams 将数据条件转发到 Kafka 主题

java - Spring Kinesis 消费者太慢

java - RxJava reduce() 在并行化时会不安全吗?

reactive-programming - Spring WebFlux/Reactor 核心

java - 只要主要可观察值未发出任何值,就使用辅助/备用可观察值

system.reactive - 为什么 RX 中的主题称为 "Subject"?

spring - spring cloud stream instanceCount 的用途是什么?