我有一个整数列表,对于每个整数,我想从数据库中获取一个字符串值,并将其放入 Map 中,其中 Integer 是来自父列表的 int 值,String 是从中获取的字符串值数据库。我还想要获取字符串以并行运行。
这就是我使用 RxJava 实现它的方式。我的代码可以工作,并且得到了预期的结果,但我不认为获取名称是并行运行的。
public Observable<Map<Integer, String>> getVitalsForConnectedDevices(int accountId) {
List<Integer> ids = Lists.newArrayList(2, 3, 5);
return Observable<Map<Integer, String>> obs = Observable.from(ids)
.flatMap((Func1<Integer, Observable<Map.Entry<Integer, String>>>) integer
-> Observable.just(Maps.immutableEntry(integer, deviceDAO.getVitalName(integer)))
.subscribeOn(Schedulers.io()), 20)
.toMap(entry -> entry.getKey(), entry -> entry.getValue());
}
这是 getVitalName() 方法
public String getVitalName(int vitalId) {
log.debug("id: " + vitalId);
String query = "SELECT name FROM vitals WHERE vital_id=?";
String name = v1JdbcTemplate.queryForObject(query, String.class, vitalId);
log.debug("name: " + name);
return name;
}
它按以下顺序打印上述函数的调试语句:
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 2
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Steps
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 3
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Floors
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 5
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Distance
如果它是并行运行的,它不应该打印所有 id 的第一个和后面的名称吗?我在这里做错了什么?如何让它们并行运行?
最佳答案
shouldn't it be printing all the id's first and names later
答案是否定的,因为您正在从数据库中为每个 id 请求一个字符串。当结果完成后,您将 Id 和 String 打包到一个 Entry 中并将其推送到管道中。请注意,只有当所有 flatMap 都完成或发出错误时,toMap 才会成功完成。因此,您可能需要等待很长时间才能得到最终结果。
作为示例:您迭代了 5 个 id。所有请求都是并行触发的。服务器因负载而陷入困境,因此某些请求将需要一些时间。假设所有请求都已完成,只有一个请求已完成。只有当最后一个请求完成时,结果 Map<> 才会被推送给订阅者。如果这是您想要的,那么我建议返回 Single<> 而不是 Observable。
这是一个带有输出的测试:
@Test
void name() throws Exception {
Observable<Tuple2<Integer, String>> tuple2Observable = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap(integer ->
Observable.fromCallable(() -> getVitalName(integer))
.subscribeOn(Schedulers.io())
.doOnNext(s -> System.out.println("Value:: " + Thread.currentThread().getName() + "-" + Instant.now()))
.map(s -> Tuple.of(integer, s))
).doOnComplete(() -> System.out.println("Finished:: " + Thread.currentThread().getName() + "-" + Instant.now()));
tuple2Observable.test()
.await();
}
public String getVitalName(int vitalId) throws Exception {
System.out.println("getVitalName method called with vitalId = " + vitalId + "-" + Thread.currentThread().getName() + "-" + Instant.now());
Thread.sleep(500);
String name = "le fake value";
return name;
}
您会看到,调用确实是在不同线程上同时进行的,并且可观察对象在所有请求完成后完成。
getVitalName method called with vitalId = 4-RxCachedThreadScheduler-4-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 6-RxCachedThreadScheduler-6-2017-10-08T21:20:43.786Z
getVitalName method called with vitalId = 5-RxCachedThreadScheduler-5-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 1-RxCachedThreadScheduler-1-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 2-RxCachedThreadScheduler-2-2017-10-08T21:20:43.784Z
getVitalName method called with vitalId = 3-RxCachedThreadScheduler-3-2017-10-08T21:20:43.787Z
Value:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-6-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-1-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-2-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-3-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-5-2017-10-08T21:20:44.304Z
Finished:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.317Z
关于java - RxJava : async calls in a loop,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46635844/