spring-webflux - Spring Webflux - 服务器/客户端线程利用率

标签 spring-webflux project-reactor reactive

我正在使用带有 Netty 的 Spring Webflux (2.0.3.RELEASE) 并尝试了解服务器和 Web 客户端如何使用线程。我用 WebClient 编写了一些带有 http 调用链的代码。我怀疑所有调用都是非阻塞的,但我不明白为什么只有一个请求通过整个链。这是下面的代码和日志输出:

public class DemoApplication {

private WebClient webclient = WebClient.create("http://localhost:8080/");

public static void main(String[] args) throws Exception {
    new DemoApplication().startServer();
}

public void startServer() throws Exception {
    RouterFunction<ServerResponse> route = routingFunction();
    HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
    HttpServer server = HttpServer.create("127.0.0.1", 8080);
    server.newHandler(adapter).block();
    Thread.sleep(1000000);
}

public RouterFunction<ServerResponse> routingFunction() throws Exception {
    return route(path("/1"), req -> ok().body(fromPublisher(get1(), String.class)))
            .andRoute(path("/2"), req -> ok().body(fromPublisher(get2(), String.class)))
            .andRoute(path("/3"), req -> ok().body(fromPublisher(get3(), String.class)));
}

public Mono<String> get1() {
    System.out.println("---------REQUEST---------");
    System.out.println("1: " + Thread.currentThread());
    return webclient.get().uri("2").retrieve().bodyToMono(String.class);
}

public Mono<String> get2() {
    System.out.println("2: " + Thread.currentThread());
    return webclient.get().uri("3").retrieve().bodyToMono(String.class);
}

public Mono<String> get3() {
    System.out.println("3: " + Thread.currentThread());
    try {
        Thread.sleep(1250000); // simulate thread somehow got blocked
    } catch (InterruptedException e) {

    }
    return Mono.just("test");
}
}

我对 localhost:8080/1 进行了 4 次调用并获得以下输出。只有一个请求设法到达第三种方法。我希望当一个线程被阻塞时,其他三个线程将能够处理其他请求,但他们没有。整个线程池由4个线程组成(与内核数相同)。

---------REQUEST---------
1: Thread[reactor-http-nio-2,5,main]
2: Thread[reactor-http-nio-4,5,main]
3: Thread[reactor-http-nio-2,5,main]
---------REQUEST---------
1: Thread[reactor-http-nio-3,5,main]
2: Thread[reactor-http-nio-1,5,main]
---------REQUEST---------
1: Thread[reactor-http-nio-3,5,main]
2: Thread[reactor-http-nio-1,5,main]
---------REQUEST---------
1: Thread[reactor-http-nio-3,5,main]
2: Thread[reactor-http-nio-1,5,main]

你能解释一下这种行为吗?

--------编辑--------

解释: https://groups.google.com/forum/#!topic/netty/1kAS-FJWGRE

最佳答案

您已经知道这一点,但其他开发人员阅读此文:您永远不应该在 Reactor 应用程序中使用阻塞操作 - 或者如果您这样做,您应该在弹性 Scheduler 上安排这项工作并注意权衡。

如果你想模拟一个需要很长时间响应的远程服务,你可以使用 delay*运算符,而不会人为地阻塞线程。在这种情况下,我猜您想模拟您的应用程序的一部分正在响应式(Reactive)管道中使用阻塞 I/O 的事实。

我认为您在这里看到的行为与服务器调用自身以及 WebClient 这两个事实有关。和 Netty 服务器共享相同的 EventLoopGroup .我不知道在那种情况下工作窃取的实现细节。

我用这样的阻塞处理程序简化了这个例子:

@Bean
public RouterFunction<ServerResponse> routingFunction() throws Exception {
    return route(all(), this::handler);
}

Mono<ServerResponse> handler(ServerRequest request) {
    System.out.println("---------REQUEST---------");
    System.out.println(Thread.currentThread());
    try {
        Thread.sleep(1250000); // simulate thread somehow got blocked
    } catch (InterruptedException e) {

    }
    return ServerResponse.ok().build();
} 

在这种情况下,使用 curl 调用该服务器客户端按预期显示以下行为(在 8 核笔记本电脑上)。

---------REQUEST---------
Thread[reactor-http-nio-2,5,main]
---------REQUEST---------
Thread[reactor-http-nio-3,5,main]
---------REQUEST---------
Thread[reactor-http-nio-4,5,main]
---------REQUEST---------
Thread[reactor-http-nio-5,5,main]
---------REQUEST---------
Thread[reactor-http-nio-6,5,main]
---------REQUEST---------
Thread[reactor-http-nio-7,5,main]
---------REQUEST---------
Thread[reactor-http-nio-8,5,main]
---------REQUEST---------
Thread[reactor-http-nio-1,5,main]

关于spring-webflux - Spring Webflux - 服务器/客户端线程利用率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50900750/

相关文章:

java - 如何将pdf/byte[]消息阅读器添加到exchangeStrategies,不支持内容类型 'application/pdf'

java - 发送收到的 spring FilePart 而不保存

java - 如果我们已经有 subscribe() 了,我们真的需要 doOnNext() 吗?

java - 如何迭代 Flux 并与 Mono 混合

java - 响应式(Reactive) Redis 主题中发布的消息不会发送到客户端

r - 根据无功输出值设置最大 slider 输入值

ios - Rx swift : How to create cache for last network response without creating class/struct property?

java - Spring Webflux - 将数据流发送到端点

java - 在 spring webflux 中使用 uri() 时如何保留 baseUrl

java - Spring Reactive WebClient