就是这样。真的找不到更接近的东西。我想实现以下任务链
List<Item> items = Collections.singletonList(new Item("John Smith", "1997-2014"));
Stream<CompletableFuture<List<ScrappingResult>>> scrappingFutures =
items.stream().map(item ->
CompletableFuture.supplyAsync(() -> new ScrappingTask(item).call()));
Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
scrappingFutures.map(resultsFuture -> ???);
Stream<CompletableFuture<TrimmingResult>> trimmingResults = scrappingFuturesUnwrapped.map(resultFuture ->
// thenCompose?
resultFuture.thenCompose(result -> {
Path clipsDir = Paths.get("./"
+ result.getItem().getName()
+ "/" + result.getItem().getTimespan());
AtomicInteger clipIdx = new AtomicInteger();
return result.getVideo().getClips().stream()
.map(clip -> CompletableFuture.supplyAsync(() ->
new TrimmingTask(
ffmpegPath,
result.getVideo().getVideoUrl(),
clip,
clipsDir.resolve("clip_" + clipIdx.incrementAndGet() + ".mp3")).call())
);
});
);
最后一行在句法上不正确,但我希望能表达意思。所以,我想做两次像 flatMap 这样的事情并得到
Stream<CompletableFuture<TrimmingResult>>
在最后。
我该怎么做?
谢谢。
最佳答案
如果我没有误解你的意图,你只想压平结果。您可以使用 Spliterator
延迟接收结果,并使用 flatMap
将 Stream
合并为扁平流,例如:
Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
scrappingFutures.flatMap(each -> unwrap(each));
static <T> Stream<? extends CompletableFuture<T>>
unwrap(CompletableFuture<? extends List<? extends T>> master) {
return generate(new Predicate<Consumer<? super CompletableFuture<T>>>() {
private Iterator<? extends T> cursor;
@Override
public boolean test(Consumer<? super CompletableFuture<T>> consumer) {
cursor = cursor == null ? await().iterator() : cursor;
if (cursor.hasNext()) {
consumer.accept(completedFuture(cursor.next()));
return true;
}
return false;
}
// v--- blocked at the first time
private List<? extends T> await() {
try {
return master.get();
} catch (InterruptedException | ExecutionException e) {
throw new CompletionException(e);
}
}
});
}
static <T> Stream<? extends CompletableFuture<T>>
generate(Predicate<Consumer<? super CompletableFuture<T>>> generator) {
long unknownSize = Long.MAX_VALUE;
return stream(new AbstractSpliterator<CompletableFuture<T>>(unknownSize, 0) {
public boolean tryAdvance(Consumer<? super CompletableFuture<T>> action) {
return generator.test(action);
}
}, false);
}
总结
上面的解决方案只是我的第一个想法,在这种情况下它不是最好的方法,你可以用 big design first 来考虑它。 .然而,即使它是一个糟糕的解决方案,但我会把它保留在这里,因为它可能会给其他人以其他方式思考。有关更多详细信息,您可以从此处查看@Holger 的评论和there .
我承认最好的方法就是@Holger在下面说的,既然没有人写下来,请让我记录下来,以服务更多人。
Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
scrappingFutures.flatMap(each -> each.join().stream())
.map(CompletableFuture::completedFuture);
关于java - 如何将 CompletableFuture<Stream<T>> 转换为 Stream<CompletableFuture<T>>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48461315/