java - 总是调用 Mono switchIfEmpty()

标签 java spring-boot reactive-programming spring-webflux project-reactor

我有两个方法。
主要方法:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}

以及这个调用的方法(服务调用外部 api):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} 

在上面的示例中,switchIfEmpty() 总是从 main 方法调用,即使返回的结果是 Mono.empty()

我找不到这个简单问题的解决方案。
以下也不起作用:

Mono.just(null) 

因为该方法会抛出一个NullPointerException

我也不能使用 flatMap 方法来检查 foundUser 是否为 null。
遗憾的是,如果我返回 Mono.empty(),flatMap 根本不会被调用,所以我也不能在此处添加条件。

@SimY4

   @PostMapping("/login")
    public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
        userExists = false;
        return socialService.verifyAccount(loginUser)
                .flatMap(socialAccountIsValid -> {
                    if (socialAccountIsValid) {
                        return this.userService.getUserByEmail(loginUser.getEmail())
                                .flatMap(foundUser -> {
                                    return updateUser(loginUser, foundUser);
                                })
                                .switchIfEmpty(Mono.defer(() -> insertUser(loginUser)))
                                .map(savedUser -> {
                                    String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                    return new ResponseEntity<>(HttpStatus.OK);
                                });
                    } else {
                        return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                    }
                });

    }

最佳答案

这是因为 switchIfEmpty “按值”接受 Mono。这意味着甚至在您订阅单声道之前,这个替代单声道的评估就已经触发。

想象这样一个方法:

Mono<String> asyncAlternative() {
    return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }));
}

如果您这样定义代码:

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

无论在流构建过程中发生什么情况,它都会始终触发 alternative。要解决此问题,您可以使用 Mono.defer

推迟对第二个单声道的评估
Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

这样它只会在请求替代时打印“你好”

更新:

详细说明我的回答。您面临的问题与 Reactor 无关,而与 Java 语言本身以及它如何解析方法参数有关。让我们检查一下我提供的第一个示例中的代码。

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

我们可以将其重写为:

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = asyncAlternative();
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

这两个代码片段在语义上是等价的。我们可以继续解包看看问题出在哪里:

Mono<String> firstMono = Mono.just("Some payload");
CompletableFuture<String> alternativePromise = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }); // future computation already tiggered
Mono<String> alternativeMono = Mono.fromFuture(alternativePromise);
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

如您所见,当我们开始编写 Mono 类型时, future 的计算已经触发。为了防止不需要的计算,我们可以将我们的 future 包装到延迟评估中:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

将解包成

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }))); // future computation defered
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

在第二个例子中, future 被困在一个懒惰的供应商中,并且仅在被请求时才被安排执行。

UPD:2022:

自从一段时间以来, react 器项目带有一个替代 API,用于包装急切计算的 future ,这导致相同的结果——在懒惰的供应商中捕获急切计算:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.fromCompletionStage(() -> alternativePromise()));

关于java - 总是调用 Mono switchIfEmpty(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54373920/

相关文章:

Spring Cron Expression 每周二晚上 9 点运行?

c# - 为什么我不需要在这个冷的 Observable 上发布?

c# - 订户丢失消息;这是 Rx 的错误还是我做错了?

java - 在android上开发游戏: rotation of text in a canvas

java - 如何在 php 脚本中执行 Java 程序?

java - Java中不使用replace函数替换字符

java - 带有 @MappedSuperclass 的自定义 AuditorAware getCurrentAuditor

spring-mvc - Spring Data Elasticsearch @Document indexName 在运行时定义

javascript - 如何为 RxJS 中的两个可观察对象设置 'wait'

java - 谓词方法在是/否/否则回答不回答的情况下返回重复提示