我想对每个队列元素执行http请求。这些请求应并行调用。
我还需要等待所有请求的终止。
我开发了以下代码:
List<Mono<MyResponseDTO>> monoList = queue.stream()
.map(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(String.class)
.filter(HttpEntity::hasBody)
.map(stringResponseEntity -> {
try {
return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
} catch (JsonProcessingException e) {
log.error("Can't parse", e);
return null;
}
})
.doOnNext(myResponseDTO -> {
log.info("doOnNext is invoked");
})
).collect(Collectors.toList());
//await when all MONOs are completed
log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
.flatMap(Function.identity())
.then();
log.info("Finished waiting for {}", monoList);
当队列有单个元素时,我看到以下日志:
2019-11-19 19:17:17.733 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [ scheduling-1] o.s.w.r.f.client.ExchangeFunctions : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230 INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService : doOnNext is invoked
因此此代码不允许等待请求终止。
我怎样才能实现它?
附注
看起来 Flux.merge(monoList).blockLast()
是我需要的。它能正常工作吗?
最佳答案
简单案例
使用它来并行执行请求并等待它们完成:
List<Mono<MyResponseDTO>> monoList = queue
.stream()
.map(requestDTO ->
webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(requestDTO)
.retrieve()
.bodyToMono(MyResponseDTO.class))
.collect(Collectors.toList());
// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();
验证
您可能希望将 reactor.netty.http.client
的日志记录设置为 DEBUG
以确保不会发出额外的请求。例如,如果您不小心同时使用了 mono#subscribe
和 mono#block
,则可能会发生这种情况。
使用 CompletableFuture 实现更复杂的情况
如果你想将响应的处理和等待请求完成分开,那么可以使用CompletableFutures:
List<Mono<MyResponseDTO>> webClientMonos = getMonos();
// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
.map(mono -> mono.toFuture())
.collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
future.thenAccept(responseDTO -> {
// Do something with a response when it arrives at some point.
// ...
});
}
// ...
// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
try {
// Maybe WebClient has been configured with timeouts,
// but it doesn't hurt to have a timeout here, too.
future.get(60, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
} catch (ExecutionException | TimeoutException ex) {
// ExecutionException is thrown if HTTP request fails.
throw new RuntimeException(ex);
}
}
关于java - 如何使用 spring webClient 等待所有 http 请求完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58939365/