java - 在 Mono 上使用 elapsed() 函数可以吗?

标签 java spring-webflux project-reactor spring-data-redis-reactive

我正在尝试获取在响应式(Reactive)编程中从 Redis 读取数据的执行时间,在查找文档时我可以看到 elapsed() 方法将执行相同的操作并实现如下代码。

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

输出:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

我预计每个缓存请求所花费的时间将小于 5 毫秒,就像第一个和第二个请求一样,但情况并非如此。 elapsed() 是否将当前的获取时间添加到累计时间中?据我了解,通量发出的每个项目都是独立的?

最佳答案

Mono#elapsed 测量 Mono 被订阅到 Mono 发出一个项目 (onNext )。

在您的情况下,导致订阅和计时器启动的原因是调用 methodToReadFromCache 的外部并行 flatMap

导致 onNext 以及计时的是 hasKey 和 if/else 部分的组合 (redisOperations.opsForValue().get(cacheKey)authzService)。

外部的 flatMap 应该至少有与 CPU 一样多的计时器,因为我们处于并行模式。

但时间偏差这一事实可能暗示着某些东西要么被阻塞,要么容量有限。例如,redisTemplate 是否一次只能处理几个键?

关于java - 在 Mono 上使用 elapsed() 函数可以吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56969306/

相关文章:

java - 同一包和不同包中的父类(super class)的子类有什么区别?

java - Webflux订阅者

spring - 在 Flux react 器中过滤重复对象

java - Reactor 中 `groupBy` 组的并行调度

登录项目 react 器的生产

java - 比较两点

java - Thymeleaf 支持 Flexbox 吗?

java - JUnit 5 中的 TestName 规则等效于什么?

kotlin - 为什么执行挂起的存储库函数而不执行非挂起的存储库函数?

spring-boot - 结合非阻塞和阻塞调用并在 Spring Webflux 中返回结果