spring - 背压在Project Reactor中如何工作?

标签 spring rx-java rx-java2 project-reactor reactor

我一直在Spring Reactor中工作,并且之前进行过一些测试,这些使我想知道Fluxes如何默认处理背压。我知道onBackpressureBuffer之类的东西存在,我也读过RxJava defaults to unbounded until you define whether to buffer, drop, etc.

因此,谁能为我澄清:Reactor 3中的助焊剂默认的背压行为是什么?

我尝试搜索答案,但未找到任何明确的答案,仅找到Backpressure的定义或上面为RxJava链接的答案

最佳答案

什么是背压?

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high - Reactor Reference


当我们谈论背压时,我们必须将源/发布者分为两组:尊重订阅者需求的群体和忽略订阅者需求的群体。
通常,热门消息源不遵守订户需求,因为它们经常产生实时数据,例如收听Twitter提要。在此示例中,订户无法控制创建鸣叫的速率,因此很容易使其不知所措。
另一方面,在订阅发生时,冷源通常按需生成数据,例如发出HTTP请求,然后处理响应。在这种情况下,您正在调用的HTTP服务器将仅在发送请求后发送响应。
重要的是要注意,这不是一条规则:并非每个热源都忽略需求,也不是每个冷源都尊重需求。您可以阅读更多有关热和冷资源here的信息。
让我们看一些可能有助于理解的示例。
尊重需求的出版商
给定一个产生从1到Integer.MAX_VALUE的数字的通量,并给出一个处理步骤,该处理步骤需要100毫秒来处理单个元素:
Flux.range(1, Integer.MAX_VALUE)
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
    .blockLast();
让我们看一下日志:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)
我们可以看到,在每个onNext之前都有一个请求。请求信号由concatMap运算符发送。当concatMap完成当前元素并准备接受下一个元素时,将发出信号。源仅在接收到来自下游的请求时才发送下一个项目。
在此示例中,背压是自动的,我们不需要定义任何策略,因为运算符(operator)知道它可以处理的内容,并且源代码会尊重它。
忽略需求且未定义背压策略的发布者
为了简单起见,我为该示例选择了一个易于理解的冷发行商。它是Flux.interval,它在指定的时间间隔内发射一项。有道理的是,这个冷淡的发布者不尊重需求,因为看到以比最初指定的间隔更长的不同间隔发出的项目是很奇怪的。
让我们看一下代码:
Flux.interval(Duration.ofMillis(1))
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)))
    .blockLast();
源每毫秒发射一项。订户能够每100毫秒处理一项。显然,订户无法跟上制作人的步伐,我们很快就会收到类似这样的异常:
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    ...
我们如何避免这种异常(exception)?
忽略需求和背压策略的发布者已定义
默认的背压策略是我们上面已经看到的策略:因错误而终止。 Reactor对我们不执行任何错误处理策略。当我们看到这种错误时,我们可以决定哪种错误最适用于我们的用例。
您可以在Reactor reference中找到其中的几个。
在此示例中,我们将使用最简单的示例:onBackpressureDrop
Flux.interval(Duration.ofMillis(1))
    .onBackpressureDrop()
    .concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
    .doOnNext(a -> System.out.println("Element kept by consumer: " + a))
    .blockLast();
输出:
Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407
我们可以看到,在前32个项目之后,跳至2400的幅度很大。由于定义了策略,因此删除了之间的元素。
关键要点
  • 背压通常是自动的,我们不需要做任何事情,因为我们可以按需获取数据。
  • 如果源不符合订户需求,我们需要定义一种策略来避免终止错误。

  • 更新:
    有用的读物​​:How to control request rate

    关于spring - 背压在Project Reactor中如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57296097/

    相关文章:

    java - 在 JPA 中保留非原始数据

    android - 使用 RxJava 时如何重试 HTTP 错误 (401) 的 Retrofit 调用?

    android - 如何为单元测试选择正确的调度程序

    java - Rx-java 2 : How to set priority between Observables?

    android - RxJava - 如何在另一个流等待第一个项目时缓冲流中的所有项目?

    java - Spring计划任务

    java - 如何验证自定义注释属性?

    java - 是否可以将 Web 应用程序构建为通用产品而不是一次性定制解决方案?

    java - PublishSubject - 有没有办法执行 onNext()?

    rx-java - 观察者一次性