java - 从 WebClient 访问时,为什么 REST 端点的处理程序会被访问两次?

标签 java rest reactive-programming spring-webflux project-reactor

这是第二次尝试,修改了演示代码,希望能更好地说明问题。代码已被精简,删除了除演示所遇到问题的元素之外的所有元素。

添加注释:进行了一些额外的测试,并将结果作为答案发布(副扩展这篇文章)。这可能是“预期的行为”,但我仍在尝试理解“原因”。

代码“有效”,因为它返回预期的信息(字符串或字符串列表)。但是,当使用 WebClient 访问返回 Flux 的 REST 端点 (localhost:8080/test/DemoClient) 时,会对关联的处理程序 (DemoMainHandler.getAll( ))。我看不到在 DemoMainHandler.getAll() 上进行第二次调用,但我担心如果这种情况发生在生产环境中,可能会出现潜在的性能问题。

在提供的代码中,所有内容都在单个 Spring Webflux 应用程序下运行,因此 DemoClient 代码没有单独的进程。

访问 localhost:8080/test/DemoClient/2 处的 REST 端点似乎工作正常,向 Postman 返回一个值为“Only One”的 Mono。更重要的是,DemoMainHandler.getById()仅被调用一次。

但是,访问 localhost:8080/test/DemoClient 上的 REST 端点会产生一些令人担忧的结果。通过 Flux 返回给 Postman 的字符串值看起来没问题,但是

  1. DemoClientHandler.getAll()在访问 REST 端点时调用
  2. *DemoClientHandler.getAll() 调用 DemoClient.getAll()
  3. DemoClient.getAll() 使用 WebClient 访问 localhost:8080/test/DemoMain 处的 REST 端点
  4. DemoClient.getAll() 使用 flatMapMany 迭代返回的 ClientResponse 并从响应正文中提取 Flux
  5. DemoCLient.getAll() 生成的 Flux 返回给 DemoClientHandler.getAll()
  6. DemoClientHandler.getAll() 检查 Flux,确定它具有一个或多个元素,并将 ServerResponse 中的 Flux 返回给初始客户端(在此示例中)案例, postman )
  7. Postman 然后解压 Flux(map?flatMap?)并显示返回的字符串(“CallMeOnce”)

我不明白的是为什么DemoClientHandler.getAll()被第二次调用,如第二个System.out.println()输出所示控制台。它似乎与使用 Flux 作为返回类型有关?

添加注释:如果问题是由 .exchange().flatMapMany() 构造驱动的,我尝试使用 .retrieve().bodyToFlux() em> 构造(请参阅 DemoClient 中的注释代码)。相同的结果(即,DemoMainHandler.getAll() 似乎被调用了两次)。

控制台输出

2019-10-07 08:16:18.953  INFO 9384 --- [           main] c.example.testdupe.TestDupeApplication   : Starting TestDupeApplication on M7730-LFR with PID 9384 (D:\sandbox\TestDupe\build\classes\java\main started by LesR in D:\sandbox\TestDupe)
2019-10-07 08:16:18.953  INFO 9384 --- [           main] c.example.testdupe.TestDupeApplication   : No active profile set, falling back to default profiles: default
2019-10-07 08:16:20.062  INFO 9384 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2019-10-07 08:16:20.062  INFO 9384 --- [           main] c.example.testdupe.TestDupeApplication   : Started TestDupeApplication in 1.324 seconds (JVM running for 1.871)

***** Invoke localhost:8080/test/DemoClient/{id}
DemoClientHandler.getById( ServerRequest )
DemoClient.getById( 2 )
DemoMainHandler.getById( ServerRequest )

***** Invoke localhost:8080/test/DemoClient
DemoClientHandler.getAll( ServerRequest )
DemoClientHandler.getAll() >> BEFORE invoking demoClient.getAll()
DemoClient.getAll()
DemoClient.getAll() >> RETURN fluxString
DemoClientHandler.getAll() >>  AFTER invoking demoClient.getAll()
DemoMainHandler.getAll( ServerRequest )
DemoMainHandler.getAll( ServerRequest )

示例代码

@SpringBootApplication
public class TestDupeApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestDupeApplication.class, args);
    }
}


@Configuration
public class DemoClientRouter {

    @Bean
    public RouterFunction<ServerResponse> clientRoutes(DemoClientHandler requestHandler) {
        return nest(path("/test"),
                nest(accept(APPLICATION_JSON),
                        RouterFunctions.route(RequestPredicates.GET("/DemoClient"), requestHandler::getAll)
                                    .andRoute(RequestPredicates.GET("/DemoClient/{id}"), requestHandler::getById)));
    }
}


@Component
public class DemoClientHandler {

    @Autowired
    DemoClient demoClient;

    public Mono<ServerResponse> getAll(ServerRequest request) {
        System.out.println("DemoClientHandler.getAll( ServerRequest )");
        System.out.println("DemoClientHandler.getAll() >> BEFORE invoking demoClient.getAll()");
        Flux<String> fluxString = demoClient.getAll();
        System.out.println("DemoClientHandler.getAll() >>  AFTER invoking demoClient.getAll()");

        return fluxString.hasElements().flatMap(hasElement -> {
            return hasElement ? ServerResponse.ok()
                                              .contentType(MediaType.APPLICATION_JSON)
                                              .body(fluxString, String.class)
                              : ServerResponse.noContent().build();
        });
    }

    public Mono<ServerResponse> getById(ServerRequest request) {
        System.out.println("DemoClientHandler.getById( ServerRequest )");
        Mono<String> monoString;

        return demoClient.getById( 2 ).flatMap(stringVal -> ServerResponse.ok()
                                                                          .contentType(MediaType.APPLICATION_JSON)
                                                                          .body(Mono.just(stringVal), String.class))
                                      .switchIfEmpty(ServerResponse.notFound().build());
    }
}


@Component
public class DemoClient {

    private final WebClient client;

    public DemoClient() {
        client = WebClient.create();
    }

    public Flux<String> getAll() {
        System.out.println("DemoClient.getAll()");
        Flux<String> fluxString;

        fluxString = client.get().uri("http://localhost:8080/test/DemoMain")
                                 .accept(MediaType.APPLICATION_JSON)
                                 .exchange()
                                 .flatMapMany(response -> response.bodyToFlux(String.class));

        // fluxString = client.get().uri("http://localhost:8080/test/DemoMain")
        //                          .accept(MediaType.APPLICATION_JSON)
        //                          .retrieve()
        //                          .bodyToFlux(String.class);

        System.out.println("DemoClient.getAll() >> RETURN fluxString");
        return fluxString;
    }

    public Mono<String> getById(int id) {
        System.out.printf("DemoClient.getById( %d )%n", id);
        return client.get().uri("http://localhost:8080/test/DemoMain/" + id)
                           .accept(MediaType.APPLICATION_JSON)
                           .exchange()
                           .flatMap(response -> response.bodyToMono(String.class));
    }
}


@Configuration
public class DemoMainRouter {
    @Bean
    public RouterFunction<ServerResponse> demoPOJORoute(DemoMainHandler requestHandler) {
        return nest(path("/test"),
                nest(accept(APPLICATION_JSON),
                        RouterFunctions.route(RequestPredicates.GET("/DemoMain"), requestHandler::getAll)
                                    .andRoute(RequestPredicates.GET("/DemoMain/{id}"), requestHandler::getById)));
    }
}


@Component
public class DemoMainHandler {

    public Mono<ServerResponse> getAll(ServerRequest request) {
        System.out.println("DemoMainHandler.getAll( ServerRequest )");

        return ServerResponse.ok()
                             .contentType(MediaType.APPLICATION_JSON)
                             .body(Flux.just("Call", "Me", "Once"), String.class);
    }

    public Mono<ServerResponse> getById(ServerRequest request) {
        System.out.println("DemoMainHandler.getById( ServerRequest )");

        return ServerResponse.ok()
                             .contentType(MediaType.APPLICATION_JSON)
                             .body(Mono.just("Only One"), String.class);
    }
}

添加此代码是为了支持后续讨论...

@Component
public class DemoClient {

    private final WebClient client;

    public DemoClient() {
        client = WebClient.create();
    }

    public Flux<String> getAll() {
        Flux<String> fluxString;

        Mono<ClientResponse> monoCR = client.get().uri("http://localhost:8080/test/DemoMain")
                                                  .accept(MediaType.APPLICATION_JSON)
                                                  .exchange();

        fluxString = monoCR.flatMapMany(clientResponse -> clientResponse.bodyToFlux(String.class));

//        fluxString.subscribe();
//        return fluxString;
        return Flux.just("Foo", "Bar");
    }

最佳答案

后续讨论。这并不是一个真正的答案,但感觉它正朝着正确的方向前进。

修改了DemoClient.gatAll()以“解开”Flux/流上的操作,希望获得一些见解。这是我所做/发现的:

  1. 我引入了一个 Mono 变量来保存 localhost:8080/test/DemoMain 上的 WebClient 访问结果
  2. 我独立调用了monoCR.flatMapMany()来获取返回的Flux,并将Flux分配给fluxString
  3. 我添加了一个 fluxString.subscribe; 语句,只是为了能够订阅返回的 Flux,而不对其进行任何操作
  4. 我引入了 return Flux.just("Foo", "Bar"); 语句,只是为了在我选择不返回 fluxString 时返回一些内容

当我注释掉 fluxString.subscribe()'return FluxString; 语句时,DemoMainHandler.getAll() 没有输出>。我想这“并不奇怪”,因为没有任何内容订阅生成的 Flux,因此不会调用 DemoMainHandler.getAll(),因为不需要 Flux。

当我取消注释 fluxString.subscribe(); 但保留 return FluxString; 注释时,我看到 println() 输出>DemoMainHandler.getAll()。再说一次,我认为这“并不奇怪”,因为 Flux 现在正在被订阅,尽管结果没有做任何处理。因此,DemoMainHandler.getAll() 被调用并输出其 println() 内容。

最后,我注释掉了 fluxString.subscribe();return Flux.just("Foo", "bar"); 语句,并取消注释 *return FluxString;”。这会产生我一直在询问的 DemoMainHandler.getAll() 的两个 println() 输出。

根据简单订阅返回的 Flux 的结果,我假设 DemoMainHandler.getAll() 的第一个 println() 输出是代表 Postman(即“最终消费者”)的隐式订阅。但是,这仍然给我留下了一个问题“为什么从 DemoMainHandler.getAll() 输出第二个 println() ?” Reactor 是否真的在订阅时调用 DemoMainHandler.getAll() 一次,并在处理实际内容时调用第二次?或者?

似乎这种行为(即处理程序方法的两次调用)仅在返回 Flux 时才会发生(请参阅 DemoMainHandler.getById() 示例)。

关于java - 从 WebClient 访问时,为什么 REST 端点的处理程序会被访问两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58277541/

相关文章:

java - 如何持续观察对象直到 onError()/取消订阅

java - 如何从android应用程序连接到远程mysql数据库

java - Spring RestTemplate GET 请求返回 302 状态

c# - 建议在长时间运行的过程中使用 rx distinct?

android - 对重新创建的 Activity 实现改造回调的最佳实践?

design-patterns - 创建 REST 服务 API 的初学者指南(包括 RESTful 身份验证)

javascript - 如何从一个流获取在其他流的最后一个事件之后发生的事件

java - 我可以使用java格式化要写入CSV文件的数据吗

java - 使用 InstallCert.java 下载可信证书

java - 异步日志记录的多线程与 JMS 队列