java - Reactor Netty 在 HttpClient 订阅时未获取 HttpServer 响应,只有在 HttpClient 阻塞时才获取

标签 java netty reactive-programming reactor-netty

使用 github 中的示例 HttpClient/HttpServer 示例,当 HttpClient 订阅而不是阻止时,我尝试打印示例 HttpServer 的响应。下面是使用的 HttpServer 示例:

public class Server {

    public static void main(String[] args) {

        DisposableServer server =
                HttpServer.create()
                        .host("10.0.0.19")
                        .port(61005)
                        .route(routes ->
                                routes.get("/hello",
                                        (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",
                                        (request, response) -> response.sendString(Mono.just("Hello World!"))))
                        .bindNow();

        server.onDispose()
                .block();
    }
}

以下是 HttpClient 和 HttpServer 使用的 Maven 依赖项:

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
            <version>1.0.3</version>
        </dependency>
    </dependencies>

如果 HttpClient 阻塞,则请求和响应将正常工作,如以下阻塞代码所示:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        String responseStr = HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .block();

        System.out.println(responseStr);
    }
}

但是HttpClient订阅了onSuccess、onError、onCompletion回调后,没有任何响应,即onSuccess、onError、onCompletion都没有执行:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);
    }
}

我还没有找到使用 subscribe() 的示例。在上面的 HttpClient 使用 subscribe() 的示例中,HttpServer 似乎没有返回响应。任何关于为什么会发生这种情况的煽动都会有所帮助。

最佳答案

在 Reactor Netty 示例中,使用 .block 是为了保持 main 线程处于 Activity 状态,因为我们不希望线程退出。 当您使用 .subscribe 时,您需要另一种机制来保持 main 线程处于 Activity 状态并防止其退出。 在您的示例中发生的情况是 main 线程退出,并且您看不到结果。

您可以使用例如CountDownLatch

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) throws Exception {

        CountDownLatch latch = new CountDownLatch(1);

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
            latch.countDown();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");
            latch.countDown();
        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);

            latch.await();
    }
}

关于java - Reactor Netty 在 HttpClient 订阅时未获取 HttpServer 响应,只有在 HttpClient 阻塞时才获取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61843235/

相关文章:

java - 如何使用 JPQL 从 Spring Data Repository 获取 HashMap 结果?

android - RxJava/RxKotlin 提示访问 View

javascript - 缓冲条形码扫描仪的击键

java - 如何用Netty搭建FTP服务器?

java - TIME_WAIT 中的 Netty 连接

c++ - 如何在 rxcpp 自定义运算符中正确推断泛型

java - 是否有识别模式本身的 Java 智能日期 API?

java - 解密使用openssl、oaep padding模式加密的非对称 key

java - 我可以在 OpenGL ES 2.0 中使用哪些版本的 GLSL?

java - 如何在netty聊天服务器端正确实现GUI