java - 如何使用 RxJava2 将不同的异步源聚合为一个?

标签 java asynchronous rx-java2

假设我有这个同步方法:

public FruitBowl getFruitBowl() {
    Apple apple = getApple(); // IO intensive
    Banana banana = getBanana(); // CPU intensive
    return new FruitBowl(apple, banana);
}

我可以使用 Java 并发 API 将其转换为异步方法,结果有点像这样:

public Future<FruitBowl> getFruitBowl() {
    Future<Apple> appleFuture = getAppleAsync(); // IO intensive
    Future<Banana> bananaFuture = getBananaAsync(); // CPU intensive
    return createFruitBowlAsync(appleFuture, bananaFuture); // Awaits appleFuture and bananaFuture and then returns a new FruitBowl
}

在利用调度程序(io 和计算)并返回 Single 的同时,惯用的 Rx 方式是什么?

最佳答案

您可以使用zip运算符(operator)。并为每个异步操作定义一个不同的线程。如果不这样做,这些方法将在同一线程上依次执行。

我会创建一个Observable两种方法的版本,以便分别返回 Observable<Apple>Observable<Banana>并以这种方式使用它们:

Observalbe.zip(getAppleObservable().subscribeOn(Schedulers.newThread()), 
               getBananaObservable().subscribeOn(Schedulers.newThread()),
     (apple, banana) -> new FruitBowl(apple, banana)))
     .subscribe(/* do your work here with FruitBowl object */);

Here有关如何使用 zip 并行化操作的更多详细信息运算符

关于java - 如何使用 RxJava2 将不同的异步源聚合为一个?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47177248/

相关文章:

javascript - 等待深度函数调用的深度异步等待结果?

asynchronous - F# 异步文件复制

java - RXJava Retrofit 返回 POST HTTP 错误 500

java - RxJava - 在 Observable<List<Service>> 中等待所有服务的 Observable<Boolean>

android - doOnSubscribe 在主线程上被调用

java - 使用 AT 命令发送短信的名称而不是号码

java - Hibernate 配置错误(找不到元素 'hibernate-configuration' 的声明)

java: 不能在抽象类中使用构造函数

java - 如何设置Android静音模式的时间

node.js - 如何将 Node.js 中到达速度过快的事件输入一一写入数据库