java - CompletableFuture 异步执行多个数据库查询

标签 java multithreading java-8 parallel-processing completable-future

我想并行执行多个数据库查询并将结果存储在 map 中。我正在尝试这样做,但是当我访问 map 时 map 没有完全填充。

我做错了什么吗?

 public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {

         Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));

         return instrumentsEdgesMap;

}

任何帮助将不胜感激,提前致谢。

最佳答案

在上述方法中 supplyAsync将由 Async 执行来自 ForkJoinPool 的线程,但是 thenApply方法总是通过调用线程来执行。因此,您的查询将按顺序一个接一个地运行,而不是异步的

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).



这是例子
CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName());
        return "SupplyAsync";
    }).thenAccept(i->{
    System.out.println(Thread.currentThread().getName()+"--"+i);
    });

输出:
ForkJoinPool.commonPool-worker-3
main--SupplyAsync

因此,如果您希望您的流程是 Async然后首先使用 supplyAsync 触发所有三个数据库查询并捕获 CompletableFuture 内的输出
CompletableFuture<Set<String>> first =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));

CompletableFuture<Set<String>> second =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));

CompletableFuture<Set<String>> third =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));

然后现在用其中三个创建一个流,然后将它们收集到 Map
Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
       .forEach(entry->{
           entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
       });

关于java - CompletableFuture 异步执行多个数据库查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58866096/

相关文章:

java - 无法在 Solaris 上初始化类 sun.awt.X11GraphicsEnvironment

java - 嵌套Optional.map的优雅替代品?

Java类加载器: findClass is not called again after ClassNotFoundException occurs

java - notifyAll() 分析时的调用差异数

java - 将带有分隔符的字符串映射到字符串列表或新对象

java - 直接从字符串创建流

java.lang.ClassCastException : android. app.Application - 类型转换

silverlight - Silverlight 后台线程中未处理异常的事件?

具有 2 个过滤条件的 java 流

c# - 使用带有 WaitCallback 方法的 ThreadPool 与简单方法有什么区别