java - 当 ForkJoinPool 结束时

标签 java java-8 parallel-processing java-stream

我试图找出所有 ForkJoinPool 线程何时完成了它们的任务。 我编写了这个测试应用程序(我使用了 System.out,因为它只是一个快速测试应用程序,并且没有错误检查/处理):

public class TestForkJoinPoolEnd {
    private static final Queue<String> queue = new LinkedList<>();
    private static final int MAX_SIZE = 5000;
    private static final int SPEED_UP = 100;

    public static void main(String[] args) {
        ForkJoinPool customThreadPool = new ForkJoinPool(12);
        customThreadPool.submit(
                () -> makeList()
                        .parallelStream()
                        .forEach(TestForkJoinPoolEnd::process));
        enqueue("Theard pool started up");

        int counter = MAX_SIZE + 1;
        while (!customThreadPool.isTerminating()) {
            String s = dequeue();
            if (s != null) {
                System.out.println(s);
                counter--;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e) {

            }
        }
        System.out.println("counter = " + counter);
        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +
                "= " + customThreadPool.isTerminating() + " isTerminated = "
                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());
    }

    static List<String> makeList() {
        return Stream.generate(() -> makeString())
                .limit(MAX_SIZE)
                .collect(Collectors.toList());
    }

    static String makeString() {
        int leftLimit = 97; // letter 'a'
        int rightLimit = 122; // letter 'z'
        int targetStringLength = 10;
        Random random = new Random();
        StringBuilder buffer = new StringBuilder(targetStringLength);
        for (int i = 0; i < targetStringLength; i++) {
            int randomLimitedInt = leftLimit + (int)
                    (random.nextFloat() * (rightLimit - leftLimit + 1));
            buffer.append((char) randomLimitedInt);
        }
        return buffer.toString();
    }

    static int toSeed(String s) {
        int sum = 0;
        for (int i = 0; i < s.length(); i++) {
            sum += s.charAt(i);
        }
        return (sum / SPEED_UP);
    }

    static void process(String s) {
        StringBuilder sb = new StringBuilder(s);
        long start = System.currentTimeMillis();
        try {
            TimeUnit.MILLISECONDS.sleep(toSeed(s));
        } catch (InterruptedException e) {

        }
        long end = System.currentTimeMillis();
        sb.append(" slept for ")
                .append((end - start))
                .append(" milliseconds");
        enqueue(sb.toString());
    }

    static void enqueue(String s) {
        synchronized (queue) {
            queue.offer(s);
        }
    }

    static String dequeue() {
        synchronized (queue) {
            return queue.poll();
        }
    }
}

这段代码卡住了,永远不会完成。如果我将 while 循环的条件更改为 !customThreadPool.isQuiescent(),它会终止循环并将计数器和队列大小设置为 1。

我应该使用什么来确定线程何时完成?

最佳答案

ExecutorService 不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整体理念是可重用。

因此它只会在应用程序对其调用 shutdown() 时终止。

您可以使用 isQuiescent() 来查明是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用 submit 返回的 future 来检查实际作业的完成情况要简洁得多。

无论哪种情况,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有未决元素。

此外,建议使用线程安全的 BlockingQueue 实现,而不是用 synchronized block 装饰 LinkedList。连同其他一些需要清理的东西,代码看起来像这样:

public class TestForkJoinPoolEnd {
    private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
    private static final int MAX_SIZE = 5000;
    private static final int SPEED_UP = 100;

    public static void main(String[] args) {
        ForkJoinPool customThreadPool = new ForkJoinPool(12);
        ForkJoinTask<?> future = customThreadPool.submit(
            () -> makeList()
                    .parallelStream()
                    .forEach(TestForkJoinPoolEnd::process));
        QUEUE.offer("Theard pool started up");

        int counter = MAX_SIZE + 1;
        while (!future.isDone()) try {
            String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
            if (s != null) {
                System.out.println(s);
                counter--;
            }
        } catch (InterruptedException e) {}

        for(;;) {
            String s = QUEUE.poll();
            if (s == null) break;
            System.out.println(s);
            counter--;
        }
        System.out.println("counter = " + counter);
        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +
                "= " + customThreadPool.isTerminating() + " isTerminated = "
                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());

        customThreadPool.shutdown();
    }

    static List<String> makeList() {
        return IntStream.range(0, MAX_SIZE)
            .mapToObj(i -> makeString())
            .collect(Collectors.toList());
    }

    static String makeString() {
        int targetStringLength = 10;
        Random random = new Random();
        StringBuilder buffer = new StringBuilder(targetStringLength);
        for (int i = 0; i < targetStringLength; i++) {
            int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
            buffer.append((char) randomLimitedInt);
        }
        return buffer.toString();
    }

    static int toSeed(String s) {
        return s.chars().sum() / SPEED_UP;
    }

    static void process(String s) {
        long start = System.nanoTime();
        try {
            TimeUnit.MILLISECONDS.sleep(toSeed(s));
        } catch (InterruptedException e) {

        }
        long end = System.nanoTime();
        QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
    }
}

如果您在接收端的 sleep 调用应该模拟一些工作负载而不是等待新项目,您还可以使用

int counter = MAX_SIZE + 1;
while (!future.isDone()) {
    String s = QUEUE.poll();
    if (s != null) {
        System.out.println(s);
        counter--;
    }
    try {
        TimeUnit.MILLISECONDS.sleep(1);
    } catch (InterruptedException e) {}
}

但逻辑没有改变。在 future.isDone() 返回 true 后,我们必须重新检查队列中的未决元素。我们只保证不会有新元素到达,而不能保证队列已经为空。

作为旁注,makeString() 方法可以进一步改进以

static String makeString() {
    int targetStringLength = 10;
    ThreadLocalRandom random = ThreadLocalRandom.current();
    StringBuilder buffer = new StringBuilder(targetStringLength);
    for (int i = 0; i < targetStringLength; i++) {
        int randomLimitedInt = random.nextInt('a', 'z' + 1);
        buffer.append((char)randomLimitedInt);
    }
    return buffer.toString();
}

甚至

static String makeString() {
    int targetStringLength = 10;
    return ThreadLocalRandom.current()
        .ints(targetStringLength, 'a', 'z'+1)
        .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
        .toString();
}

关于java - 当 ForkJoinPool 结束时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57655847/

相关文章:

java - 如何在 Java 中格式化 XML 模式验证错误消息?

java - 方法引用转换如何工作?

c++ - OpenMP 只创建一个线程

java - Java中继承的静态方法中获取类名

java - 有没有更好的方法可以使用 Stream 进行编写?

c - C有模板吗?

parallel-processing - 两个大文件彼此的平行余弦相似度

java - 一个衬垫检查两个十进制字符串仅相差 1 ulp

java - Java 中的 SSL 超重

java - 如何将 Java 变量插入 MySQL 查询?