reactive-programming - 从浏览器/REST 客户端调用时如何自动订阅 REST 端点?

标签 reactive-programming spring-webflux project-reactor reactive-streams

在 ProjectReactor 或 Reactive Streams 中,在您订阅()之前什么都不会发生。

除非有人订阅,否则响应式(Reactive)流数据流不会发生,但我看到所有 REST API,如查找、保存和插入都没有显式调用订阅,但数据在生产者和订阅者之间流动。

@RestController
class PersonController {

      private final PersonRepository repository;

      public PersonController(PersonRepository repository) {
        this.repository = repository;
      }
      @GetMapping("/all")
      public Flux<Person> index() {

         return repository.findAll();

     }
      @GetMapping("/people")
      Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {

        Flux<Person> result = repository.findByLastname(lastname);
        return result.map(it -> it.getFullName());
      }

      @PostMapping("/people")
      Flux<People> AddPeople(@RequestBody Flux<Person> people) {

          return repository.saveAll(people);
      }
}

为什么我们不需要调用订阅 让 REST 端点在 Project Reactor 中启动数据流?

当我从浏览器调用时,REST 端点(HTTP 请求)如何自动订阅 Reactive Streams 以获取数据流?

我在这里错过了什么吗?

最佳答案

你是对的 - 当你的应用程序正在设置 Flux/Mono react 管道,该管道中的任何内容都不会执行,直到 subscribe给它。

这是 Spring WebFlux 中请求/响应交换期间发生的情况:

  • 服务器收到请求并将其转发给 WebFlux
  • 根据请求和您的应用程序代码,将构建一个 react 管道,包括过滤器、 Controller 等。您可以将其视为将请求链接到响应的管道
  • HTTP 客户端通过 TCP 堆栈请求读取,并且该背压信息由底层服务器传输。

  • Spring WebFlux 中的最低合约是 HttpHandler - 它是与底层服务器交互的合约。
    在 Reactor Netty 的情况下,该服务器已经支持 react 流 API,并且订阅是由服务器 native 完成的。
    对于其他基于 Servlet 的服务器,我们使用 react 流桥接至 Servlet 3.1。在 ServletHttpHandlerAdapter ,我们在 react 流世界和异步 I/O Servlet API 之间架起了一座桥梁——订阅实际上发生在那个桥梁中。

    另外:请注意,我们通常不使用 subscribeWebClient 返回的值;如果您不在响应式(Reactive)管道的中间(即不在 Controller 处理程序的中间),您只能这样做。在这些情况下,我们通常将其插入管道中间的响应式(Reactive)运算符;如果不这样做,您将无法保证何时会收到 HTTP 客户端响应——这完全将调用与应用程序的其余部分分离。

    关于reactive-programming - 从浏览器/REST 客户端调用时如何自动订阅 REST 端点?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50795071/

    相关文章:

    javascript - 如何取消订阅或取消对 RxJS observable 大型数组的过滤?

    spring-data-jpa - Spring Webflux (Reactor) Scheduler 在出错后不会终止

    objective-c - RACSignal : how to reduce on arbitrarily large combine

    database - Slick 3.0 在数据库驱动程序级别是响应式(Reactive)/异步的吗?适用于哪些数据库?

    c# - 使用 Rx 为 webservice 调用创建一个轮询请求

    java - 可以通过指标 :9443 port 访问 8080 端口中的 Spring WebFlux 端点

    spring - 如何在非响应式(Reactive) Spring EventListener 和响应式(Reactive) Flux 之间架起桥梁

    spring - Spring Webflux中有没有静态的方法来获取当前的ServerHttpRequest?

    java - WebFlux : onErrorContinue does not continue

    java - as(Function<? super Mono<T>,P> 转换器) 的用法是什么