我正在尝试在响应式(Reactive)编程中创建一个有弹性的 sse(服务器发送事件)客户端。 sse 端点经过身份验证,因此我必须为每个请求添加授权 header 。
授权 token 将在 1 小时后过期。
下面是我的代码片段
webClient.get()
.uri("/events")
.headers(httpHeaders -> httpHeaders.setBearerAuth(authService.getIdToken()))
.retrieve()
.bodyToFlux(ServerSideEvent.class)
.timeout(Duration.ofSeconds(TIMEOUT))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(RETRY_DELAY)))
.subscribe(
content -> {
handleEvent(content);
},
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
如果 1 小时后由于任何原因失去连接,此代码将停止工作,因为 token 已过期。
如何将 token 刷新到重试逻辑中或以其他方式刷新? 谢谢
最佳答案
您可以使用网络客户端过滤器。
过滤器可以拦截和修改客户端请求和响应。 示例:
WebClient.builder().filter((request, next) -> {
ClientRequest newReuqest = ClientRequest.from(request)
.header("Authorization", "YOUR_TOKEN")
.build();
return next.exchange(newRequest);
}).build();
更新:
抱歉,没有看清楚您的问题。试试这个:
假设服务器在 token 过期时返回 401 代码。
WebClient.builder().filter((request, next) -> {
final Mono<ClientResponse> response = next.exchange(request);
return response.filter(clientResponse -> clientResponse.statusCode() != HttpStatus.UNAUTHORIZED)
// handle 401 Unauthorized (token expired)
.switchIfEmpty(next.exchange(ClientRequest.from(request)
.headers(httpHeaders -> httpHeaders.setBearerAuth(getNewToken()))
.build()));
}).build();
或者你可以缓存你的token(例如保存到redis并在一小时内设置TTL),当redis中的token为空时,获取新的然后再次保存到redis。
关于java - 更新 Flux WebClient 中的授权 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72540727/