java - Java 8 并行流中的自定义线程池

标签 java concurrency parallel-processing java-8 java-stream

是否可以为 Java 8 指定自定义线程池 parallel stream ?我在任何地方都找不到它。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大而且是多线程的,所以我想把它分开。我不希望在另一个模块的 applicationblock 任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数现实世界的情况下安全地使用并行流。

试试下面的例子。有一些 CPU 密集型任务在单独的线程中执行。 这些任务利用并行流。第一个任务被破坏了,所以每一步需要 1 秒(通过线程 sleep 模拟)。问题是其他线程卡住并等待中断的任务完成。这是一个人为的例子,但想象一个 servlet 应用程序和某人向共享 fork 连接池提交一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

最佳答案

实际上有一个技巧可以在特定的 fork-join 池中执行并行操作。如果您将它作为一个任务在一个 fork-join 池中执行,它会停留在那里并且不使用公共(public)池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

诀窍是基于 ForkJoinTask.fork它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者使用 ForkJoinPool.commonPool() 如果不是 inForkJoinPool() "

关于java - Java 8 并行流中的自定义线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21163108/

相关文章:

c++ - 无法使用 MPI_Send 和 MPI_Recv 发送 std::vector

Java Pelops 和 Cassandra NoSQL DB : Can I Batch Delete Rows?

java - 使用 Hibernate SQLQuery 返回 Postgres UUID

Java多线程线程随机终止

c++ - 为什么在使用 8 个生产者 1 个消费者进行测试时,golang channel 比 intel tbb concurrent_queue 快得多

concurrency - 聊天机器人 : ensuring serial processing of messages on a per-conversation basis in clustered environment

java - 寻找一种方法将 0xa5 和 0x9c 组合成 0xa59c

java - Android项目中的配置属性文件

linux - 并行计算 : how to share computing resources among users?

java - 创建三个 Mono 后立即并行执行它们,等待所有 Mono 完成并以特定顺序/逻辑收集结果