spring-boot - 如何并行调用多个Spring Webclient并等待结果?

标签 spring-boot kotlin spring-webflux project-reactor spring-webclient

我是 Reactive 编程的新手,我想并行调用两个 API 并处理结果并返回一个简单的数组或项目列表。

我有两个函数,一个返回一个 Flux,另一个返回一个 Mono,我根据那个 Mono 的结果对 Flux 发出的项目做了一个非常简单的过滤逻辑。

我尝试使用 zipWith,但无论采用何种过滤逻辑,只有一个项目到达了最后。我也尝试过 block 但在 Controller 内部不允许这样做:/

@GetMapping("/{id}/offers")
fun viewTaskOffers(
        @PathVariable("id") id: String,
        @AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
    data class TaskOfferPair(
        val task: TaskDTO,
        val offer: ViewOfferDTO
    )

    return client.getTaskOffers(id).map {
            it.toViewOfferDTO()
        }.zipWith(client.getTask(id), BiFunction {
            offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
        }).filter {
            it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
        }.map {
            it.offer
        }
}
  • getTaskOffers 返回 OfferDTO
  • 的 Flux
  • getTask 返回 TaskDTO
  • 的 Mono

如果您不能回答我的问题,请至少告诉我如何并行执行多个 API 调用并在 WebClient 中等待结果

最佳答案

这是一个并行调用的用例。

public Mono<UserInfo> fetchCarrierUserInfo(User user) {
        Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
        Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());

        return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
            UserInfo userInfo = tuple.getT1();
            userInfo.setCarrier(tuple.getT2());
            return userInfo;
        });
    }

这里:

  • fetchUserInfo 进行 http 调用以从另一个服务获取用户信息并返回 Mono
  • fetchCarrierInfo 方法进行 HTTP 调用以从另一个服务获取 carrierInfo 并返回 Mono
  • Mono.zip() 将给定的 monos 合并到一个新的 Mono 中,当所有给定的 Monos 都生成一个项目时,该 Mono 将完成,并将它们的值聚合到一个 Tuple2 中。

然后调用fetchCarrierUserInfo().block()得到最终结果。

关于spring-boot - 如何并行调用多个Spring Webclient并等待结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56467836/

相关文章:

JSON 解析错误 : Can not construct instance of java. time.LocalDate: 没有从字符串值反序列化的字符串参数构造函数/工厂方法

java - Spring boot应用程序创建两个数据库行

kotlin - 同一类 Kotlin 中的 @Provides 和 @Binds 方法

java - 如何使用 Mono & Flux 限制并发 http 请求

java - 如何在Spring WebClient中设置和处理超时?

java - 无法访问 AWS Tomcat 上的 MySQL 数据库,但可以在本地 Tomcat 上正常工作。 (JDBS 连接错误)

java - 无法在 Jboss-eap-6.4 中部署 Springboot 应用程序

kotlin - kotlin如何共享常见的改造响应处理程序代码

android - 当我构建我的应用程序时,如果它们属于另一个模块,导入将成为未解析的引用

java - Project Reactor-如何处理Flux.interval的OverflowException?