java - 等待 vertx 中多个可观察对象的响应

标签 java rx-java vert.x

我正在使用 vertx-rx-java

我有一个处理程序,我需要通过 EventBus 发出 2 个不同的请求,并使用这两个请求的响应创建响应。

public handle(RoutingContext context) {
....some code...    

    Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
    Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
    ... TODO ...
}

基本上,我需要组合两个请求结果并将它们放入 RoutingContext 响应中。问题是我不完全理解如何以 rxjava 风格做到这一点。 我能想到的唯一方法就是这样:

firstRequest.doOnSuccess(resp1 -> {
  secondRequest.doOnSuccess(resp2 -> {

  });
});

但我认为这是一个不好的方法,因为如果有 10 个请求而不是 2 个怎么办?此代码将有 10 个嵌套调用。

有没有更好的方法来组合多个请求结果?

最佳答案

zip 运算符可用于关联多个源的排放,其区别在于它仅在每个基础源排放时才排放。所以...

  • 如果有两个底层源,zip 将成对发出。
  • 如果存在三个底层源,zip 将发出三元组。
  • ...等等

要亲身了解我的意思,您可以引用RxMarbles页面,并在观察底部流的同时处理顶部两个流中的排放。

有了这样的理解,您就可以使用 zip运算符来组合 Message 回复的结果,如下所示:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

如果任一传递失败,则将触发 onError 处理程序。否则将触发onSuccess

如果,正如您所提到的,您想要立即处理大量请求,则有一个重载变体 zip接受 Iterable 源。在你的情况下,可能看起来像这样:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

希望有帮助!

关于java - 等待 vertx 中多个可观察对象的响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52432972/

相关文章:

java - 如何阻止线程等待 vert.x 中的响应?

java - 如何删除 Spring Boot 应用程序中的白色标签错误页面?

java - servlet 仅显示数据库中的一项数据而不是全部

java - 在 Tomcat 上部署 Vert.x

java - 使用 Rxjava 检测值变化

java - 无法解析符号 AndroidScheduler

java - 垂直x。在一次调用 update() 中执行多次插入

java - 在Java中运行selenium webdriver测试用例时出现参数错误

java - 循环错误打印表

Android Api 调用列表中的项目