java - 如何将 CompletableFuture<Stream<T>> 转换为 Stream<CompletableFuture<T>>?

标签 java java-8 completable-future

就是这样。真的找不到更接近的东西。我想实现以下任务链

List<Item> items = Collections.singletonList(new Item("John Smith", "1997-2014"));

Stream<CompletableFuture<List<ScrappingResult>>> scrappingFutures =
    items.stream().map(item ->
        CompletableFuture.supplyAsync(() -> new ScrappingTask(item).call()));

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
    scrappingFutures.map(resultsFuture -> ???);

Stream<CompletableFuture<TrimmingResult>> trimmingResults = scrappingFuturesUnwrapped.map(resultFuture ->
    // thenCompose?
    resultFuture.thenCompose(result -> {
        Path clipsDir = Paths.get("./"
            + result.getItem().getName()
            + "/" + result.getItem().getTimespan());

        AtomicInteger clipIdx = new AtomicInteger();

        return result.getVideo().getClips().stream()
            .map(clip -> CompletableFuture.supplyAsync(() ->
                new TrimmingTask(
                    ffmpegPath,
                    result.getVideo().getVideoUrl(),
                    clip,
                    clipsDir.resolve("clip_" + clipIdx.incrementAndGet() + ".mp3")).call())
            );
    });
);

最后一行在句法上不正确,但我希望能表达意思。所以,我想做两次像 flatMap 这样的事情并得到 Stream<CompletableFuture<TrimmingResult>>在最后。

我该怎么做?

谢谢。

最佳答案

如果我没有误解你的意图,你只想压平结果。您可以使用 Spliterator 延迟接收结果,并使用 flatMapStream 合并为扁平流,例如:

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> unwrap(each));

static <T> Stream<? extends CompletableFuture<T>> 
  unwrap(CompletableFuture<? extends List<? extends T>> master) {

    return generate(new Predicate<Consumer<? super CompletableFuture<T>>>() {

        private Iterator<? extends T> cursor;

        @Override
        public boolean test(Consumer<? super CompletableFuture<T>> consumer) {
            cursor = cursor == null ? await().iterator() : cursor;
            if (cursor.hasNext()) {
                consumer.accept(completedFuture(cursor.next()));
                return true;
            }
            return false;
        }

        //                        v--- blocked at the first time
        private List<? extends T> await() {
            try {
                return master.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }
    });
}

static <T> Stream<? extends CompletableFuture<T>> 
  generate(Predicate<Consumer<? super CompletableFuture<T>>> generator) {

    long unknownSize = Long.MAX_VALUE;
    return stream(new AbstractSpliterator<CompletableFuture<T>>(unknownSize, 0) {
        public boolean tryAdvance(Consumer<? super CompletableFuture<T>> action) {
            return generator.test(action);
        }
    }, false);
}

总结

上面的解决方案只是我的第一个想法,在这种情况下它不是最好的方法,你可以用 big design first 来考虑它。 .然而,即使它是一个糟糕的解决方案,但我会把它保留在这里,因为它可能会给其他人以其他方式思考。有关更多详细信息,您可以从此处查看@Holger 的评论和there .

我承认最好的方法就是@Holger在下面说的,既然没有人写下来,请让我记录下来,以服务更多人。

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> each.join().stream())
                               .map(CompletableFuture::completedFuture);

关于java - 如何将 CompletableFuture<Stream<T>> 转换为 Stream<CompletableFuture<T>>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48461315/

相关文章:

java - 如何使用 BorderLayout/GroupLayout 从 JPanel 中删除特定元素?

java - 如何使用Java 8的默认方法在接口(interface)中使用EntityManager实例?

java - JDK 已更新,但以前的版本仍显示为当前版本

java - 如何让 lambda 表达式在 Java 8 中定义 toString?

java - Hibernate搜索lucene : Updating index document by removing a child entity makes children entity as orphan in index document

java - 如何将图像从一个标签拖放到另一个标签?

java - 这2种方法的区别

java - 如果 completableFuture 失败,如何记录?

java - java.util.concurrent.CompletableFuture 中的异常传播

java - 调用 mySql 过程时出现错误