spring-boot - 返回 Mono<ServerResponse> 会导致(邪恶的)同步、阻塞客户端/服务器通信吗?

标签 spring-boot spring-webflux spring-reactive spring-reactor

我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能性 Web 中的事件流有点困惑。 示例:我有一个处理函数返回 Mono<ServerResponse> 。其中,一个findAll()执行存储库方法返回 Flux<T> 。根据响应式(Reactive)声明,为了异步、非阻塞并允许背压,我希望看到 onNext()对于从存储库返回的每个元素。但是,在请求处理期间查看服务器日志,我只看到一个 onNext()事件,这是有道理的,因为我的返回类型是 Mono包含响应:

路由器功能

@Bean
 public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
     return RouterFunctions
             .route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
                     , itemsHandler::getAll);
}

处理函数

Mono<ServerResponse> getAll(ServerRequest request) {
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(itemRepository.findAll(), Item.class)
            .log("GET items");
}

事件日志

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

相比之下,使用 Flux<T> 实现经典的 Spring 带注释的 Controller 方法作为返回类型,我会看到 onNext()对于 T 的每个实例(即结果集中的每个项目),对我来说看起来更“正确”(客户端现在可以控制事件流等):

Controller

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
    return itemRepository
            .findAll()
            .log("GET items");
}

日志

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

这很令人困惑。让我详细说明一下:

  • 使用 Mono<ServerResponse>看起来很邪恶,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、支持背压的事件流的 react 原则。这不是剥夺了客户端的控制权吗?对我来说,这看起来像是传统的、阻塞的客户端/服务器通信。
  • 返回 Flux<T>直接感觉好多了,因为它支持按结果事件处理和背压控制。

我的问题是:

  • 创建 Mono<ServerResponse> 有何影响? ?这是否会导致阻塞、同步交互,从而发出 onNext()仅当从存储库中读取所有项目时?我会失去背压功能等吗?
  • 如何让功能样式后端发送 onNext()对于结果集中的每个项目?
  • 对于完全响应式的函数式处理函数的返回类型(即非阻塞、异步和反压兼容),最佳实践是什么?我不确定是否Mono<ServerResponse>并没有违反这些 react 性原则。

我可能完全错了或者遗漏了一些重要的东西。感谢您的帮助!

最佳答案

这完全取决于使用 ServerResponse 的客户端。根据 WebFlux 文档( https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux )设置处理函数以返回 Mono<ServerResponse>无论返回的项目数量是标准方式并且绝对没问题 - 只要客户端正确处理底层 Flux<T>一切都很好。我的问题出现是因为我使用 curl 测试了端点,它无法检测底层 Flux 。使用功能风格启用的客户端(如 org.springframework.web.reactive.function.client.WebClient ), Mono<ServerResponse>可以反序列化为 Flux<T>首先,启用所有良好的 react 功能,并使我们的onNext()事件出现。

客户端代码

像这样调用后端,将 ServerResponse 反序列化为 Flux:

@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
    return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
            .retrieve()
            .bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
            .log("GET all items from server");
}

将导致看到所有 onNext()事件,启用客户端事件处理:

2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()

因此,只要客户端正确处理响应,一切都会完全响应。

关于spring-boot - 返回 Mono<ServerResponse> 会导致(邪恶的)同步、阻塞客户端/服务器通信吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61713107/

相关文章:

java - 无法将字节数组转换为音频AAC文件

java - 为什么 HATEOAS 在使用 Swagger 2.x 启动期间开始为 spring-boot 版本 >= 2.2.x 创建问题?

java - Spring 启动 FileNotFoundException : Could not open ServletContext resource [/WEB-INF/main-servlet. xml]

reactive-programming - Spring WebFlux/Reactor 核心

java - 使用 Spring Reactive (R2DBC) 连接 MSSQL,创建 bean 时出错

java - 即使值存在,也无法单独从 Redis 加载值

java - 将 Maven 转换为 gradle 后,Spring bootRun 失败并出现神秘错误

java - 为什么 Spring 不为关系数据库提供响应式(Reactive)(非阻塞)客户端?

扩展 Flux/实现发布者并多次调用 s.onNext() 时,Spring 5 Reactive 失败

spring - 如何在非响应式(Reactive) Spring EventListener 和响应式(Reactive) Flux 之间架起桥梁