java - WebFlux 链接调用多个服务和响应聚合

标签 java spring-webflux project-reactor

我最近开始使用 WebFlux,需要有关如何链接多个服务并聚合响应的建议。这 4 个服务及其响应 POJO 类似于以下示例:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

以及4个服务的签名:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

因此需要为 Flux 中的每个 Response1 调用 service2,为每个 Response2 调用 service3。模型之间的关系是:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

聚合的最终响应应类似于 (JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

所以首先我需要调用Service1,然后为每个Response1调用service2,然后为每个Response2(由service2返回)调用service3。此外,为 service1 返回的每个response1 调用service4(可以与service2 和service3 调用并行调用)。为了更新聚合的最终响应,我添加了两个额外的 POJO 以允许存储子响应,例如(相关位):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

如何最好地进行链接,以便保留以前的响应数据,并在组合运算符时保留 AggResponse 对象中的所有数据?我尝试了以下方法:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

但是,我收到以下不完整的回复:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

当我看到日志时,我看到所有服务都被调用。两个问题: 1.为什么看不到聚合响应,是否可以减少丢失它? 2.是否有更好的方法来实现这一目标?

最佳答案

您可以使用merge如果您不想等待,请使用此方法 service2next为您发出信号service4 。像这样的事情:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

实用程序类和方法:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

并且您应该非常小心异步环境中的可变集合。避免在响应式(Reactive)管道内更改它。

关于java - WebFlux 链接调用多个服务和响应聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54543039/

相关文章:

unit-testing - Project Reactor block() 与 StepVerifier 中的正确测试模式

java - java剩余天数计算

java - RestDocumentation 类已弃用,我应该使用什么来代替?

java - 如何修复 Spring 中的 JSON 解码错误?

java - Spring WebFlux Reactor - 更新 Flux 中的对象

java - Spring Webflux Reactor 上下文

spring - 不使用 block()/blockFirst()/blockLast() 将单声道转换为对象 java

java - 读取和写入 DVD/CD - Java

java - 什么是原始类型,为什么我们不应该使用它呢?

java - NoSuchBeanDefinitionException : No qualifying bean of type 'org.springframework.boot.web.reactive.error.ErrorAttributes' available: