java - 如何使用 spring webClient 等待所有 http 请求完成?

标签 java resttemplate spring-webflux project-reactor spring-webclient

我想对每个队列元素执行http请求。这些请求应并行调用。
我还需要等待所有请求的终止。

我开发了以下代码:

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

当队列有单个元素时,我看到以下日志:

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

因此此代码不允许等待请求终止。

我怎样才能实现它?

附注

看起来 Flux.merge(monoList).blockLast() 是我需要的。它能正常工作吗?

最佳答案

简单案例

使用它来并行执行请求并等待它们完成:

List<Mono<MyResponseDTO>> monoList = queue
        .stream()
        .map(requestDTO ->
                webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(requestDTO)
                    .retrieve()
                    .bodyToMono(MyResponseDTO.class))
        .collect(Collectors.toList());

// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();

验证

您可能希望将 reactor.netty.http.client 的日志记录设置为 DEBUG 以确保不会发出额外的请求。例如,如果您不小心同时使用了 mono#subscribemono#block,则可能会发生这种情况。

使用 CompletableFuture 实现更复杂的情况

如果你想将响应的处理和等待请求完成分开,那么可以使用CompletableFutures:

List<Mono<MyResponseDTO>> webClientMonos = getMonos();

// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
        .map(mono -> mono.toFuture())
        .collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
    future.thenAccept(responseDTO -> {
        // Do something with a response when it arrives at some point.
        // ...
    });
}

// ...

// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
    try {
        // Maybe WebClient has been configured with timeouts,
        // but it doesn't hurt to have a timeout here, too.
        future.get(60, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ex);
    } catch (ExecutionException | TimeoutException ex) {
        // ExecutionException is thrown if HTTP request fails.
        throw new RuntimeException(ex);
    }
}

关于java - 如何使用 spring webClient 等待所有 http 请求完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58939365/

相关文章:

java - 无法在 JavaFX XML 文件中解析正确的图像 url

java - restTemplate 向 Netty Server 请求挂起线程

java - 调试 RestTemplate 发布请求

reactive-programming - 订阅后函数现在可以正常执行

java - 解码时处理 JAXB 中的转义字符

java - 由 : java. lang.IllegalStateException 引起:尝试重新打开已关闭的对象:SQLiteDatabase

Java WebFlux/Reactor - 使用仅在 Mono<Set<String>> 中找到的项目过滤 Flux<List<String>>

java - 分散-聚集 : combine set of Mono<List<Item>> into single Mono<List<Item>>

java - 我无法弄清楚这个 ConcurrentModificationException

java - restTemplate 交换经常会导致 400 错误