我正在使用 vertx-rx-java
我有一个处理程序,我需要通过 EventBus
发出 2 个不同的请求,并使用这两个请求的响应创建响应。
public handle(RoutingContext context) {
....some code...
Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
... TODO ...
}
基本上,我需要组合两个请求结果并将它们放入 RoutingContext
响应中。问题是我不完全理解如何以 rxjava 风格做到这一点。
我能想到的唯一方法就是这样:
firstRequest.doOnSuccess(resp1 -> {
secondRequest.doOnSuccess(resp2 -> {
});
});
但我认为这是一个不好的方法,因为如果有 10 个请求而不是 2 个怎么办?此代码将有 10 个嵌套调用。
有没有更好的方法来组合多个请求结果?
最佳答案
zip
运算符可用于关联多个源的排放,其区别在于它仅在每个基础源排放时才排放。所以...
- 如果有两个底层源,
zip
将成对发出。 - 如果存在三个底层源,
zip
将发出三元组。 - ...等等
要亲身了解我的意思,您可以引用RxMarbles页面,并在观察底部流的同时处理顶部两个流中的排放。
有了这样的理解,您就可以使用 zip运算符来组合 Message
回复的结果,如下所示:
Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
// ...do stuff with the replies and compose some result
// to be handled in onSuccess()
return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
result -> {
System.out.println("## onSuccess(" + result + ")");
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
}
);
如果任一传递失败,则将触发 onError
处理程序。否则将触发onSuccess
。
如果,正如您所提到的,您想要立即处理大量请求,则有一个重载变体 zip接受 Iterable
源。在你的情况下,可能看起来像这样:
final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);
Single.zip(requests, replies -> {
// ...do stuff with the array of replies
return null;
})
.subscribe(...);
希望有帮助!
关于java - 等待 vertx 中多个可观察对象的响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52432972/