java - 如何在 Spring WebFlux 中配置背压?

标签 java reactive-programming spring-webflux project-reactor backpressure

我试图了解如何在 Spring WebFlux 中应用背压。我了解背压的理论,但我无法重现它,所以我没有完全理解它。

让我们看下面的例子:

public void test() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();

    new Thread(() -> {
        int i = 0;
        while(runThread) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
            processor.onNext("Value: " + i);
            i++;
        }
        processor.onComplete();
    }).start();

    processor
            .subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}

private Consumer<String> makeSubscriber(String label) {
    return v -> {
        System.out.println(label + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {
        }
    };
}

我以 EmitterProcessor 的形式创建了一个 Hot Flux,并在一个单独的线程中开始为其生成数据。 低一点,我订阅它。订阅者的速度比元素的生成速度慢,所以问题应该开始发生,对吧? 但订阅者逻辑是在生产者线程上运行的。当我调用processor.onNext()时,它会同步调用所有订阅者,因此如果订阅者很慢,发布者也会变慢。所以,背压似乎根本没有用。

我还尝试制作两个 Spring Boot WebFlux 应用程序,一个具有 Flux 端点,另一个使用该端点,因此我可以确定使用者在单独的线程上运行。但是,我在消费者中所做的任何背压尝试都不起作用。没有缓冲区被填充,没有任何东西被丢弃或任何东西!

谁能给我一个背压的具体例子吗?最好是在 Spring WebFlux 中,但我会采用任何反应式 Java 库。

最佳答案

您选择的订阅方法变体的文档如下:

订阅将请求无限制的需求 (Long.MAX_VALUE)。

也就是说,您自己关闭了背压。

要使用背压,请订阅 Flux.subscribe(Subscriber)

关于java - 如何在 Spring WebFlux 中配置背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60222240/

相关文章:

java - 如何使用 Eclipse 在 Java 中生成 Web 服务客户端

java - 添加 equals() 和 hashCode() 方法会破坏某些东西吗

c# - 从 Observable 的 Subscribe 方法返回 Disposable

kotlin - Mono.fromCallable 中的异常不会导致错误

spring-webflux - 如何使用 spring webflux 进行文件流传输

java - 如何复制二维数组并添加新行来存储 Java 中列的总和?

java - 在 java servlet 中创建异步进程

c# - 如何优化递归函数的 Reactive 实现

system.reactive - 在 C# 中使用 Reactive Extensions 时如何显示进度

spring-webflux - 当 Flux 为空时返回 404