我最近开始使用 WebFlux,需要有关如何链接多个服务并聚合响应的建议。这 4 个服务及其响应 POJO 类似于以下示例:
class Response1{
String a1;
String a2;
}
class Response2{
String b1;
}
class Response3{
String c1;
}
class Response4{
String d1;
}
以及4个服务的签名:
Flux<Response1> service1();
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)
因此需要为 Flux 中的每个 Response1 调用 service2,为每个 Response2 调用 service3。模型之间的关系是:
Response1 <1-----*>Response2 (1 to many),
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)
聚合的最终响应应类似于 (JSON):
[
{
"a1": "",
"a2": "",
"d1": "",
"response2s": [
{
"b1": "",
"response3s": [
{
"c1": ""
}
]
}
]
}
]
所以首先我需要调用Service1,然后为每个Response1调用service2,然后为每个Response2(由service2返回)调用service3。此外,为 service1 返回的每个response1 调用service4(可以与service2 和service3 调用并行调用)。为了更新聚合的最终响应,我添加了两个额外的 POJO 以允许存储子响应,例如(相关位):
public class AggResponse extends Response1{
List<AggResponse2> response2s;// populated from service2 response
String d1; // populated from service4 response
public void add(AggResponse2 res2){
if(response2s == null)
response2s = new ArrayList<>();
response2s.add(res2);
}
}
和
public class AggResponse2 extends Response2{
List<Response3> response3s;// populated from service3 response
public void add(Response3 res3) {
if (response3s == null)
response3s = new ArrayList<>();
response3s.add(res3);
}
}
如何最好地进行链接,以便保留以前的响应数据,并在组合运算符时保留 AggResponse 对象中的所有数据?我尝试了以下方法:
public Flux<AggResponse> aggregate() {
return services.service1()
.map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
.flatMap(aggRes -> services.service2(aggRes.getA1())
.map(res2 -> {
AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
aggRes.add(aggRes2);
return aggRes2;
})
.flatMap(aggRes2 -> services.service3(aggRes2.getB1())
.map(res3 -> {
aggRes2.add(res3);
return res3;
})
.reduce(aggRes2, (a, aggRes3) -> aggRes2)
)
.reduce(aggRes, (a, aggRes2) -> aggRes)
)
.flatMap(aggRes -> services.service4(aggRes.getA1())
.map(res4 -> {
aggRes.setD1(res4.getD1());
return aggRes;
})
);
}
但是,我收到以下不完整的回复:
[ {
"a1" : "a1v1",
"a2" : "a2v1"
} ]
当我看到日志时,我看到所有服务都被调用。两个问题: 1.为什么看不到聚合响应,是否可以减少丢失它? 2.是否有更好的方法来实现这一目标?
最佳答案
您可以使用merge
如果您不想等待,请使用此方法 service2
的next
为您发出信号service4
。像这样的事情:
return service1().flatMap(response1 ->
Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
.reduce((aggResponse, aggResponse2) -> new AggResponse(
response1.a1,
response1.a2,
Optional.ofNullable(aggResponse.d1)
.orElse(aggResponse2.d1),
Optional.ofNullable(aggResponse.response2s)
.orElse(aggResponse2.response2s))));
实用程序类和方法:
class AggContainer {
final String b1;
final List<Response3> response3s;
AggContainer(String b1, List<Response3> response3s) {
this.b1 = b1;
this.response3s = response3s;
}
}
class AggResponse {
final String a1;
final String a2;
final String d1;
final List<AggContainer> response2s;
AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
this.a1 = a1;
this.a2 = a2;
this.d1 = d1;
this.response2s = response2s;
}
AggResponse(String d1) {
this.a1 = null;
this.a2 = null;
this.d1 = d1;
this.response2s = null;
}
AggResponse(List<AggContainer> response2s) {
this.a1 = null;
this.a2 = null;
this.d1 = null;
this.response2s = response2s;
}
}
private Mono<AggResponse> service23Agg(String a1) {
return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
.map(response3s -> new AggContainer(response2.b1, response3s)))
.collectList()
.map(AggResponse::new);
}
private Mono<AggResponse> service4Agg(String a2) {
return service4(a2).map(response4 -> new AggResponse(response4.d1));
}
并且您应该非常小心异步环境中的可变集合。避免在响应式(Reactive)管道内更改它。
关于java - WebFlux 链接调用多个服务和响应聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54543039/