java - 缓存和使缓存的 Mono 失效

标签 java reactive-programming spring-webflux project-reactor spring-webclient

我在尝试缓存 WebClient 返回的 Mono 时遇到问题。代码是这样的:

public Mono<Token> authenticate() {
    return cachedTokenMono = cachedTokenMono
        .switchIfEmpty(
            Mono.defer(() -> 
                    getToken())
                    .cache(token ->
                               Duration.between(Instant.now(), token.getExpires().toInstant()),
                           (Throwable throwable) -> Duration.ZERO,
                           () -> Duration.ZERO));
}

目的是缓存用于接收 token Mono,直到 token 过期。 token 过期后,缓存的 Mono 变空,并请求新的 token 。 这按预期工作,但不幸的是 switchIfEmpty() 实际上并没有“切换”,而是包装了源代码 Mono。结果,随着创建越来越多的包装 SwitchIfEmptyMono,这会造成内存泄漏。 在这种情况下,正确的模式是什么?有没有办法用新的 Mono 替换空的?

最佳答案

你可以这样做:

private final Mono<Token> authenticateMono = getToken()
            .cache(
                    token -> Duration.between(Instant.now(), token.getExpires().toInstant()),
                    throwable -> Duration.ZERO,
                    () -> Duration.ZERO)

public Mono<Token> authenticate() {
    return authenticateMono;
}

这个想法是您返回相同的缓存 Mono<Token>每次调用 authenticate() 的实例。 .cache运算符确保为每个订阅检查缓存的结果。

具体:

  • 如果有新的订阅到达并且没有缓存值,那么缓存操作符将订阅 Mono<Token>getToken() 返回(这将触发 token 检索)。
  • 如果某个值已被缓存,并且新的订阅在缓存超时之前到达,则缓存运算符会将缓存的值发送给新订阅者
  • 如果某个值已被缓存,并且在缓存超时后有新订阅到达,则缓存运算符将重新订阅 Mono<Token>getToken() 返回(这将触发 token 重新检索)。
  • 如果 Mono<Token>getToken() 返回完成时出现异常,那么该异常将不会被缓存,因此会传播,并且到达的下一个订阅将再次重新触发 token 检索

这一切都假设:

  • getToken()在订阅者到达之前不执行任何操作
  • getToken()检索每个订阅者的 token
  • 您只想为所有订阅者提供一个有效 token

另请注意,根据您的使用案例,您可能希望在 token 到期日期之前稍微使 token 过期,以解决时钟偏差。即在新 token 实际过期之前抢先检索它,以防止返回 Token在下游有机会使用它之前就会过期。

关于java - 缓存和使缓存的 Mono 失效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58867228/

相关文章:

java - 如何理解线程?

swift - RxSwift和Moya同步请求

java - 无法启动 spring boot web 应用程序

spring-cloud-loabalancer 配置静态服务器列表

java - Spring Webflux : Extract a value from a Mono and save it into another variable

java - 需要在不按回车键的情况下进行按键操作

java - Android升级到 'com.google.android.gms:play-services-ads:18.1.0'后编译报错

java - 在 RxJava 中同时从 Socket 读取和写入

spring-boot - 如何将 Spring WebClient 与 Jetty 一起使用,而不是 Netty?

java - 编写自定义 Lint 规则以确保在每次使用之前进行空检查以避免 NullPointerException