java - 如何立即将任务从一个线程池传输到另一个线程池?

标签 java threadpool executorservice

我有一个输入元素列表,我想将其排队到多个线程池中。假设这是我的输入:

final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());

这些是我希望元素依次运行的三个函数:

final Function<Integer, Integer> step1 =
        value -> { // input from the ints list
            return value * 2;
        };

final Function<Integer, Double> step2 =
        value -> { // input from the previous step1
            return (double) (value * 2); //
        };

final Function<Double, String> step3 =
        value -> { // input from the previous step2
            return "Result: " + value * 2;
        };

这些将是每个步骤的池:

final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);

我希望每个元素都通过 step1Pool 运行并应用 step1。一旦完成一个元素,其结果应该 最终在 step2pool 中,以便可以在此处应用 step2。一旦 step2Pool 中的某件事完成,它应该 应应用在 step3Poolstep3 中排队的内容。 在我的主线程上,我想等到获得 step3 的所有结果。每个元素的处理顺序 没关系。只是它们都在正确的线程池上运行 step1 -> step2 -> step3

基本上我想并行化Stream.map,将每个结果立即推送到下一个队列,然后等到我完成 从我最后一个线程池中获取了 ints.size() 结果。

有没有一种简单的方法可以用Java实现呢?

最佳答案

我相信 CompletableFuture 会为您提供帮助!

List<CompletableFuture<String>> futures = ints.stream()
            .map(i -> CompletableFuture.supplyAsync(() -> step1.apply(i), step1Pool)
                    .thenApplyAsync(step2, step2Pool)
                    .thenApplyAsync(step3, step3Pool))
            .collect(Collectors.toList());
List<String> result = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());

关于java - 如何立即将任务从一个线程池传输到另一个线程池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55592491/

相关文章:

java - 如果基类具有带参数的构造函数,如何添加不带参数的构造函数

java - newCachedThreadPool如何复用线程?

java - 由 : java. lang.OutOfMemoryError: Java heap space 引起

java - 在 tomcat servlet 例程中使用异步 HTTP 客户端更好吗?

c# - 报告线程进度的最佳方式

java - 了解 Java 执行器服务

java - 如何在多线程程序中保存线程变量的值

java - 从数据库中搜索数据..?

java - 我可以从 native c++ 检测 Java 版本吗

java - InterviewBit 中的 LinkedList 函数