java - Spring Web-Flux 中的背压机制

标签 java reactive-programming spring-webflux backpressure

我是首发 Spring Web-Flux .我写了一个 Controller 如下:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道其中一项 react 性好处是 背压 ,它可以平衡请求或响应率。我想实现如何在 中使用背压机制Spring Web-Flux .

最佳答案

WebFlux 中的背压
为了理解 Backpressure 在 WebFlux 框架的当前实现中是如何工作的,我们必须在这里回顾一下默认使用的传输层。我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器的通信通常也是如此)是通过 TCP 连接完成的。 WebFlux 还使用该传输在客户端和服务器之间进行通信。
然后,为了理解背压控制术语的含义,我们必须从 Reactive Streams 规范的角度回顾一下背压的含义。

The basic semantics define how the transmission of stream elements is regulated through back-pressure.


因此,根据该陈述,我们可以得出结论,在 Reactive Streams 中,背压是一种机制,它通过传输(通知)接收者可以消耗多少元素来调节需求;这里我们有一个棘手的问题。 TCP 具有字节抽象而不是逻辑元素抽象。我们通常所说的背压控制是指控制向/从网络发送/接收的逻辑元素的数量。即使 TCP 有自己的流量控制(参见含义 here 和动画 there),该流量控制仍然针对字节而不是针对逻辑元素。
在WebFlux模块的当前实现中,背压是由传输流控制来调节的,但是并没有暴露接收者的真实需求。为了最终看到交互流程,请看下图:
enter image description here
为简单起见,上图显示了两个微服务之间的通信,其中左侧发送数据流,右侧使用该流。以下编号列表提供了该图的简要说明:
  • 这是 WebFlux 框架,它适当注意将逻辑元素转换为字节并返回以及将它们传输到 TCP(网络)/从 TCP(网络)接收它们。
  • 这是元素的长时间运行处理的开始,一旦作业完成就请求下一个元素。
  • 在这里,虽然没有业务逻辑的需求,但 WebFlux 会将来自网络的字节排入队列,而无需他们的确认(业务逻辑没有需求)。
  • 由于 TCP 流量控制的性质,服务 A 仍可能向网络发送数据。

  • 从上图中我们可以注意到,接收方暴露的需求与发送方的需求不同(这里是逻辑元素中的需求)。这意味着两者的需求是隔离的,仅适用于 WebFlux <-> 业务逻辑(服务)交互,并且对服务 A <-> 服务 B 交互暴露较少的背压。所有这一切都意味着 WebFlux 中的背压控制并不像我们预期的那样公平。
    所有这一切都意味着 WebFlux 中的背压控制并不像我们预期的那样公平。
    但是我还是想知道怎么控制背压
    如果我们仍然希望在 WebFlux 中对背压进行不公平的控制,我们可以在 Project Reactor 操作符(例如 limitRate() )的支持下做到这一点。 .以下示例显示了我们如何使用该运算符:
    @PostMapping("/tweets")
    public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
        
        return tweetService.process(tweetsFlux.limitRate(10))
                           .then();
    }
    
    正如我们从示例中看到的,limitRate()运算符允许定义一次预取的元素数量。这意味着即使最终订阅者请求 Long.MAX_VALUE元素,limitRate运营商将该需求拆分为多个块,并且不允许一次消耗更多。我们可以对元素发送过程做同样的事情:
    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
        
        return tweetService.retreiveAll()
                           .limitRate(10);
    }
    
    上面的例子表明,即使 WebFlux 一次请求超过 10 个元素,limitRate()将需求限制为预取大小,并防止一次消耗超过指定数量的元素。
    另一种选择是实现自己的 Subscriber或延长 BaseSubscriber来自 Project Reactor。例如,以下是我们如何做到这一点的一个简单示例:
    class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
    
        int consumed;
        final int limit = 5;
    
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(limit);
        }
        
        @Override
        protected void hookOnNext(T value) {
            // do business logic there 
    
            consumed++;
            
            if (consumed == limit) {
                consumed = 0;
                
                request(limit);
            }
        }
    }
    
    使用 RSocket 协议(protocol)公平背压
    为了通过网络边界实现逻辑元素背压,我们需要一个合适的协议(protocol)。幸运的是,有一个叫RScoket protocol . RSocket 是一种应用级协议(protocol),允许通过网络边界传输实际需求。
    该协议(protocol)有一个 RSocket-Java 实现,允许设置 RSocket 服务器。在服务器到服务器通信的情况下,相同的 RSocket-Java 库也提供了客户端实现。要了解更多如何使用 RSocket-Java,请参见以下示例 here .
    对于浏览器-服务器通信,有一个 RSocket-JS允许通过 WebSocket 在浏览器和服务器之间连接流通信的实现。
    基于 RSocket 的已知框架
    现在有一些框架建立在 RSocket 协议(protocol)之上。
    变形杆菌
    其中一个框架是 Proteus 项目,它提供构建在 RSocket 之上的成熟微服务。此外,Proteus 与 Spring 框架很好地集成在一起,所以现在我们可以实现公平的背压控制(参见示例 there)
    进一步阅读
  • https://www.netifi.com/proteus
  • https://medium.com/netifi
  • http://scalecube.io/
  • 关于java - Spring Web-Flux 中的背压机制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52244808/

    相关文章:

    Java BufferedReader ,重置读取器

    java - android audiomanager 中的 MODE_IN_COMMUNICATION 错误?

    java - 非 react 性客户端(RestTemplate)是否可以使用 react 性 REST API(WebFlux)

    kotlin - 为什么执行挂起的存储库函数而不执行非挂起的存储库函数?

    spring-webflux - 添加重试WebClient的所有请求

    java - 有没有办法合并多个单声道错误信号?

    Java : Converting colored image to monochrome and keep text readable

    java - 由于版本差异,jboss 中出现 log4j 错误

    ios - RxSwift - 验证文本字段是否聚焦

    ios - 使用 RxSwift 循环