java - RxJava : async calls in a loop

标签 java loops asynchronous rx-java

我有一个整数列表,对于每个整数,我想从数据库中获取一个字符串值,并将其放入 Map 中,其中 Integer 是来自父列表的 int 值,String 是从中获取的字符串值数据库。我还想要获取字符串以并行运行。

这就是我使用 RxJava 实现它的方式。我的代码可以工作,并且得到了预期的结果,但我不认为获取名称是并行运行的。

public Observable<Map<Integer, String>> getVitalsForConnectedDevices(int accountId) {
    List<Integer> ids = Lists.newArrayList(2, 3, 5);

    return Observable<Map<Integer, String>> obs = Observable.from(ids)
            .flatMap((Func1<Integer, Observable<Map.Entry<Integer, String>>>) integer
                    -> Observable.just(Maps.immutableEntry(integer, deviceDAO.getVitalName(integer)))
                    .subscribeOn(Schedulers.io()), 20)
            .toMap(entry -> entry.getKey(), entry -> entry.getValue());
}

这是 getVitalName() 方法

public String getVitalName(int vitalId) {
    log.debug("id: " + vitalId);
    String query = "SELECT name FROM vitals WHERE vital_id=?";
    String name = v1JdbcTemplate.queryForObject(query, String.class, vitalId);
    log.debug("name: " + name);
    return name;
}

它按以下顺序打印上述函数的调试语句:

09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 2
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Steps
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 3
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Floors
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 5
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Distance

如果它是并行运行的,它不应该打印所有 id 的第一个和后面的名称吗?我在这里做错了什么?如何让它们并行运行?

最佳答案

shouldn't it be printing all the id's first and names later

答案是否定的,因为您正在从数据库中为每个 id 请求一个字符串。当结果完成后,您将 Id 和 String 打包到一个 Entry 中并将其推送到管道中。请注意,只有当所有 flatMap 都完成或发出错误时,toMap 才会成功完成。因此,您可能需要等待很长时间才能得到最终结果。

作为示例:您迭代了 5 个 id。所有请求都是并行触发的。服务器因负载而陷入困境,因此某些请求将需要一些时间。假设所有请求都已完成,只有一个请求已完成。只有当最后一个请求完成时,结果 Map<> 才会被推送给订阅者。如果这是您想要的,那么我建议返回 Single<> 而不是 Observable。

这是一个带有输出的测试:

@Test
void name() throws Exception {
    Observable<Tuple2<Integer, String>> tuple2Observable = Observable.just(1, 2, 3, 4, 5, 6)
            .flatMap(integer ->
                    Observable.fromCallable(() -> getVitalName(integer))
                            .subscribeOn(Schedulers.io())
                            .doOnNext(s -> System.out.println("Value:: " + Thread.currentThread().getName() + "-" + Instant.now()))
                            .map(s -> Tuple.of(integer, s))
            ).doOnComplete(() -> System.out.println("Finished:: " + Thread.currentThread().getName() + "-" + Instant.now()));

    tuple2Observable.test()
            .await();
}

public String getVitalName(int vitalId) throws Exception {
    System.out.println("getVitalName method called with vitalId = " + vitalId + "-" + Thread.currentThread().getName() + "-" + Instant.now());

    Thread.sleep(500);

    String name = "le fake value";
    return name;
}

您会看到,调用确实是在不同线程上同时进行的,并且可观察对象在所有请求完成后完成。

getVitalName method called with vitalId = 4-RxCachedThreadScheduler-4-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 6-RxCachedThreadScheduler-6-2017-10-08T21:20:43.786Z
getVitalName method called with vitalId = 5-RxCachedThreadScheduler-5-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 1-RxCachedThreadScheduler-1-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 2-RxCachedThreadScheduler-2-2017-10-08T21:20:43.784Z
getVitalName method called with vitalId = 3-RxCachedThreadScheduler-3-2017-10-08T21:20:43.787Z
Value:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-6-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-1-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-2-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-3-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-5-2017-10-08T21:20:44.304Z
Finished:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.317Z

关于java - RxJava : async calls in a loop,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46635844/

相关文章:

Java循环根据整数值和字母字符输出模式

python - 如果不满足条件,如何防止脚本崩溃?

python - 在 Python 循环中创建多个变量/字符串

javascript - Angular fire 2 async 调用一次,但在第一个回调完成之前不处理第二个回调

c# - Parallel.ForEach 同时保留顺序

winforms - 如何异步使用SaveFileDialog?

java - 当服务器在触发事件时发送电子邮件时,是否有必要在 HttpServlet 中使用serialVersionUID

java - RemoteWebDriver 和 WebElement 的通用接口(interface)

java - 从简单的 JSP spring MVC 页面获取值(value)

java - 找到两个线性等式成立的整数集