java - 什么决定了 Java ForkJoinPool 创建的线程数?

标签 java parallel-processing threadpool fork-join

据我了解 ForkJoinPool,该池创建固定数量的线程(默认值:核心数)并且永远不会创建更多线程(除非应用程序通过使用managedBlock)。

然而,使用 ForkJoinPool.getPoolSize() 我发现在一个创建 30,000 个任务 (RecursiveAction) 的程序中,ForkJoinPool 执行那些任务平均使用 700 个线程(每次创建任务时都会计算线程数)。这些任务不做 I/O,而是纯粹的计算;唯一的任务间同步是调用 ForkJoinTask.join() 并访问 AtomicBoolean,即没有线程阻塞操作。

由于我理解的 join() 不会阻塞调用线程,因此池中的任何线程都没有理由阻塞,因此(我假设)应该没有有理由创建更多线程(这显然正在发生)。

那么,为什么 ForkJoinPool 会创建这么多线程?哪些因素决定了创建的线程数?

我曾希望可以在不发布代码的情况下回答这个问题,但这里是应要求提供的。这段代码是从一个四倍大小的程序中摘录出来的,精简到了基本部分;它不会按原样编译。如果需要,我当然也可以发布完整的程序。

程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径。保证存在解决方案。主要逻辑在 SolverTaskcompute() 方法中:一个 RecursiveAction 从某个给定点开始,并继续与所有可到达的相邻点当前点。它不是在每个分支点创建一个新的 SolverTask(这会创建太多的任务),而是将除一个之外的所有邻居推送到回溯堆栈中以供稍后处理,并仅继续处理未推送的一个邻居到堆栈。一旦它以这种方式到达死胡同,最近推送到回溯堆栈的点被弹出,并且从那里继续搜索(相应地削减从 taks 的起点构建的路径)。一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务;从那时起,该任务会继续从其回溯堆栈中弹出直到耗尽,但在到达分支点时不会将任何进一步的点插入其堆栈,而是为每个这样的点创建一个新任务。因此,可以使用堆栈限制阈值来调整任务的大小。

我在上面引用的数字(“平均 30,000 个任务,700 个线程”)来自于对 5000x5000 个单元格的迷宫进行搜索。所以,这里是基本代码:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

最佳答案

stackoverflow上有相关问题:

ForkJoinPool stalls during invokeAll/join

ForkJoinPool seems to waste a thread

我对正在发生的事情做了一个可运行的精简版本(我使用的 jvm 参数:-Xms256m -Xmx1024m -Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

显然,当您执行连接时,当前线程会看到所需的任务尚未完成,并为自己执行另一个任务。

它发生在 java.util.concurrent.ForkJoinWorkerThread#joinTask

然而,这个新任务产生了更多相同的任务,但它们在池中找不到线程,因为线程被锁定在连接中。而且由于它无法知道释放它们需要多少时间(线程可能处于无限循环或永远死锁),因此会产生新线程(补偿加入的线程为 Louis Wasserman 提到):java.util.concurrent.ForkJoinPool#signalWork

因此,为了防止这种情况,您需要避免递归生成任务。

例如,如果在上面的代码中将初始参数设置为 1,即使将 childrenCount 增加十倍, Activity 线程数也会为 2。

另请注意,虽然 Activity 线程数量增加,但运行线程数量小于或等于并行度

关于java - 什么决定了 Java ForkJoinPool 创建的线程数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10797568/

相关文章:

java - Tomcat TLD 扫描上的 ClassNotFoundException oracle.i18n.util.LocaleMapper。 ojdbc7 maven dep (xmlparserv2-12.1.0.2.jar transitive) 导致这个错误

c# - TPL 数据流资源未发布

scala - 如何设置用于par的线程数

c# - 使用ThreadPool的优化方案。 C#

c++ - PPL - 如何配置 native 线程数?

c# - 我可以让我的线程池创建它的线程作为前台吗?

java - 是否有一个名为 Pascal 的网络编码程序?

java - IdeaIC 检查 cli 失败,带有 '-changes' 选项

java - Delphi tdatetime 到 Java 日期/日历

parallel-processing - 语言/MPI : Is it possible to declare a variable on a single processor only?