spring-webflux - 添加重试WebClient的所有请求

标签 spring-webflux project-reactor

我们有一个服务器来检索 OAUTH token ,并且通过 WebClient.filter 方法将 oauth token 添加到每个请求中
例如

webClient
                .mutate()
                .filter((request, next) -> tokenProvider.getBearerToken()
                        .map(token -> ClientRequest.from(request)
                                .headers(httpHeaders -> httpHeaders.set("Bearer", token))
                                .build()).flatMap(next::exchange))
                .build();
TokenProvider.getBearerToken returns Mono<String> since it is a webclient request (this is cached)

我想要一个重试功能,在 401 错误时,将使 token 无效并再次尝试请求
我有这样的工作
webClient.post()
            .uri(properties.getServiceRequestUrl())
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromObject(createRequest))
            .retrieve()
            .bodyToMono(MyResponseObject.class)
            .retryWhen(retryOnceOn401(provider))

private Retry<Object> retryOnceOn401(TokenProvider tokenProvider) {
        return Retry.onlyIf(context -> context.exception() instanceof WebClientResponseException && ((WebClientResponseException) context.exception()).getStatusCode() == HttpStatus.UNAUTHORIZED)
                .doOnRetry(objectRetryContext -> tokenProvider.invalidate());
    }

有没有办法将其移动到 webClient.mutate().....build() 函数?
以便所有请求都具有此重试功能?

我尝试添加为过滤器,但它似乎不起作用,例如
.filter(((request, next) -> next.exchange(request).retryWhen(retryOnceOn401(tokenProvider))))

有关解决此问题的最佳方法的任何建议?
问候

最佳答案

我发现了这一点,在看到重试仅适用于异常后很明显,webClient 不会抛出异常,因为 clientResponse 对象只保存响应,只有当 bodyTo 被调用时才会在 http 状态上抛出异常,所以要解决这个问题,可以模仿这种行为

@Bean(name = "retryWebClient")
    public WebClient retryWebClient(WebClient.Builder builder, TokenProvider tokenProvider) {
        return builder.baseUrl("http://localhost:8080")
                .filter((request, next) ->
                        next.exchange(request)
                            .doOnNext(clientResponse -> {
                                    if (clientResponse.statusCode() == HttpStatus.UNAUTHORIZED) {
                                        throw new RuntimeException();
                                    }
                            }).retryWhen(Retry.anyOf(RuntimeException.class)
                                .doOnRetry(objectRetryContext -> tokenProvider.expire())
                                .retryOnce())

                ).build();
    }

编辑重复/重试的功能之一是,它不会更改原始请求,在我的情况下,我需要检索新的 OAuth token ,但上面发送了相同的(过期) token 。
我确实想出了一种使用交换过滤器来做到这一点的方法,一旦 OAuth 密码流在 spring-security-2.0 中,我应该能够将其与 AccessTokens 等集成,但同时
ExchangeFilterFunction retryOn401Function(TokenProvider tokenProvider) {
        return (request, next) -> next.exchange(request)
                .flatMap((Function<ClientResponse, Mono<ClientResponse>>) clientResponse -> {
                    if (clientResponse.statusCode().value() == 401) {
                        ClientRequest retryRequest = ClientRequest.from(request).header("Authorization", "Bearer " + tokenProvider.getNewToken().toString()).build();
                        return next.exchange(retryRequest);
                    } else {
                        return Mono.just(clientResponse);
                    }
                });
    }

关于spring-webflux - 添加重试WebClient的所有请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50752021/

相关文章:

junit - 无法调用 "reactor.core.publisher.Mono.thenReturn(Object)",因为返回值为空

x509certificate - 谁有在Spring Cloud Gateway/Spring WebFlux中实现x509相互认证的简单例子?

java - 如何从 Reactor 核心中的 Mono<String> 中提取字符串

java - Spring Boot 2 Reactor Flux API 的 Angular 客户端

java - Flux 不等待 'then' 之前的元素完成

websocket - 使用 Micronaut Websocket 端点从未绑定(bind)的 Flux 发送更新

java - Spring Reactive - 重用Mono值

java - 有人可以解释一下 doOnSubscribe、doOnNext 和 doOnSuccess 没有被执行吗

thymeleaf - 使用 Spring WebFlux 和 Thymeleaf 进行重定向后获取

java - 更新 Flux WebClient 中的授权 header