java - Spring WebFlux 和 Reactor 的线程模型

标签 java multithreading reactive-programming project-reactor spring-webflux

目前正在尝试使用 Spring 5.0.0.RC2Reactor 3.1.0.M2Spring Boot 2.0.0.M2 进行响应式编程.

想了解 WebFlux 和 Reactor 用于正确编码应用程序和处理可变状态的并发和线程模型

Reactor 文档指出该库被认为与并发无关,并提到了调度程序抽象。 WebFlux 文档没有提供信息。

然而,当通过 Spring Boot 使用 WebFlux 时,定义了一个线程模型。

从我的实验中我得到了:

  • 模型既不是1个事件线程,也不是1个事件线程+worker
  • 使用了多个线程池
  • reactor-http-nio-3”线程:可能每个核心一个,处理传入的 HTTP 请求
  • Thread-7”线程:用于 MongoDB 或 HTTP 资源的异步请求
  • parallel-1”线程:每个内核一个,由 Reactor 的 Schedulers.parallel() 创建,由延迟运算符等使用
  • 共享的可变状态必须由应用同步
  • ThreadLocal(用于应用程序状态、MDC 日志等)不是请求范围的,所以不是很有趣

这是正确的吗? WebFlux的并发和线程模型是什么:例如默认的线程池是什么?

感谢您提供的信息

最佳答案

问题后,present documentation does provide some clues about the concurrency model以及人们可以期待的线程(但我仍然认为从多线程的角度更清晰/更好地描述幕后发生的事情会受到 Spring 新手的高度赞赏)。

它讨论了 Spring MVC 和 Spring WebFlux 之间的区别(1-thread-per-request 模型与事件循环):

In Spring MVC, and servlet applications in general, it is assumed that applications may block the current thread, e.g. for remote calls, and for this reason servlet containers use a large thread pool, to absorb potential blocking during request handling.

In Spring WebFlux, and non-blocking servers in general, it is assumed that applications will not block, and therefore non-blocking servers use a small, fixed-size thread pool (event loop workers) to handle requests. Invoking a Blocking API

但请注意,Spring MVC 应用程序也可能引入一些异步性(参见 Servlet 3 Async)。我建议this presentation讨论 Servlet 3.1 NIO 和 WebFlux。

回到文档:它还表明,在使用响应式流时,您可以进行一些控制:

What if you do need to use a blocking library?

Both Reactor and RxJava provide the publishOn operator to continue processing on a different thread.

(更多详情请引用scheduling in Reactor)

它还讨论了您可能期望在 WebFlux 应用程序中使用的线程(粗体是我的):

Threading Model

What threads should you expect to see on a server running with Spring WebFlux?

  • On a "vanilla" Spring WebFlux server (e.g. no data access, nor other optional dependencies), you can expect one thread for the server, and several others for request processing (typically as many as the number of CPU cores). Servlet containers, however, may start with more threads (e.g. 10 on Tomcat), in support of both servlet, blocking I/O and servlet 3.1, non-blocking I/O usage.
  • The reactive WebClient operates in event loop style. So you’ll see a small, fixed number of processing threads related to that, e.g. "reactor-http-nio-" with the Reactor Netty connector. However if Reactor Netty is used for both client and server, the two will share event loop resources by default.
  • Reactor and RxJava provide thread pool abstractions, called Schedulers, to use with the publishOn operator that is used to switch processing to a different thread pool. The schedulers have names that suggest a specific concurrency strategy, e.g. "parallel" for CPU-bound work with a limited number of threads, or "elastic" for I/O-bound work with a large number of threads. If you see such threads it means some code is using a specific thread pool Scheduler strategy.
  • Data access libraries and other 3rd party dependencies may also create and use threads of their own.

部分可以通过配置来配置线程模型的细节

To configure the threading model for a server, you’ll need to use server-specific config APIs, or if using Spring Boot, check the Spring Boot configuration options for each server. The WebClient can be configured directly. For all other libraries, refer to their respective documentation.

此外,例如讨论 Default number of threads in Spring boot 2.0 reactive webflux configuration 亮点,

The default number of threads for request handling is determined by the underlying web server; by default, Spring Boot 2.0 is using Reactor Netty, which is using Netty's defaults

这是默认组件及其默认值(以及整体配置,包括通过注释透明注入(inject)的配置)的问题——这也可能在 Spring/Boot 和相应依赖项的版本之间发生变化。 说了这么多,看来你的猜测是对的。

关于java - Spring WebFlux 和 Reactor 的线程模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45019486/

相关文章:

java - 使用 Safenet HSM 和 Java 解包 RSA 加密的 AES key 会泄漏解包的 key

java - 在java中显示日期和时间,比较当前日期和过去日期

c# - 使 [IsOneWay=true] WCF 服务异步与在客户端使用任务调用同步方法之间是否存在显着差异?

java - 防御性复制是否足以从可变线程不安全类创建不可变线程安全类?

rxjs - 为什么 withLatestFrom RxJS 方法不是静态的?

spring - 把阻塞代码包装成一个Mono flatMap,这还是非阻塞操作吗?

java - 为实现接口(interface)的类实现可序列化

java - 将文件(.zip、.jar、...)下载到文件夹

C# Express 2010 多线程

r - Shiny :将侧边栏控件重置为默认值