multithreading - 异步填充 Java Map 并将其作为 future 返回

标签 multithreading asynchronous java-8 future completable-future



目前,我可以使用 CompletableFuture.runAsync(Runnable, Executor) 在 map 本身中异步创建每个单独的对象。类似于下面的示例代码,但我不确定如何构建 Future/CompletableFuture用于返回 Map 的类型机制准备就绪时:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;

    public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
        // TODO: This blocks until the training is done; Instead, return a
        // future to the caller somehow
        return result;

但是,使用上面的代码,当代码调用 AsynchronousTest.create(Map<String,Integer) ,它已经阻塞,直到该方法返回完全填充的 ConcurrentMap<String,Integer> ;我怎样才能把它变成类似 Future<Map<String,Integer>> 的东西以便我以后可以使用它?:
Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
// Do lots of other stuff
Map<String,Integer> completedModels = futureClassModels.get();


正如@Holger 在他的评论中所说,您必须避免调用 .join()并依靠thenApply()相反,例如像这样:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;

    public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
        // using thenApply instead of join here:
        return CompletableFuture.allOf(
            ).thenApply(x -> result);

关于multithreading - 异步填充 Java Map 并将其作为 future 返回,我们在Stack Overflow上找到一个类似的问题:


java - 使用多线程时如何深度复制 HashMap

java - 每个请求的线程 tcp 服务器

c# - 等待 IEnumerable 项目,(等待后等待)

Java 7 -> Java 8 : AES Causes exception: "BadPaddingException: Given final block not properly padded" in conjunction with BufferedReader & ZipStreams

java-8 - 引用特定对象的实例方法会破坏 Java 中的类型安全吗?

android - 等待 Activity 显示

c# - 简单快速的异步二进制 TCP 套接字服务器?

c# - 等待来自 SendGrid 的 API 返回代码时应用程序卡住

java - 如何使用 Java 8 流获取具有映射值的列表的嵌套分组

java - Java中如何隔离线程间的变量?