java - 以分层方式创建多级线程

标签 java multithreading executorservice

我正在尝试创建一个应用程序,它将以树状方式创建线程。我的主要方法是在Level0,它在Level1创建线程。然后Level1将创建一些线程。 Level1 中的每个线程将创建与 Level2 不同的线程集,依此类推。

下面是我尝试使用 ExecutorService 的代码:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadTree {

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

//This is the main , level0

List ll = new ArrayList<>();
ll.add(2);
ll.add(5);
ll.add(8);

Iterator it = ll.iterator();
while (it.hasNext()) {

    exec.submit(new Level1((Integer)it.next()));
}
}
}

class Level1 implements Runnable{

private ScheduledExecutorService exec;
private Integer num;

public Level1(Integer n){
    num = n;
    exec = Executors
            .newScheduledThreadPool(n);
}

@Override
public void run() {

    for(int i=0;i<num;i++){
        exec.scheduleAtFixedRate(new Level2(), 0, 2, TimeUnit.SECONDS);
    }


}

}

class Level2 implements Runnable{

@Override
public void run() {

    System.out.println("Current Thread ID : " + Thread.currentThread().getId() + "Current Thread Name : " 
            + Thread.currentThread().getName()) ;
    //Do some task and create new threads

}

}

我有 2 个问题:

  1. 这是以树的方式创建线程的唯一方法吗?有没有其他方法可以通过一些分组来有效地处理这个问题?我已经展示了 3 个级别,但也可以有更多级别。
  2. 如果这是一个好方法,那么将线程中的任何故障传播到其上方的某个级别的最佳方法是什么,等等。

提前致谢。

最佳答案

我认为您可以按照以下步骤实现这一目标 -

  • 仅使用一个线程池执行器
  • 创建从子任务到父任务的通知机制。 (请注意任务线程不同。任务只是一个Runnable对象,而Thread是一个Java线程,通过执行器执行可运行任务。)

请引用下面的示例 -

public class ThreadTree {

private static final ExecutorService executor = Executors.newCachedThreadPool();

public static void main(String[] args) {
    List<Integer> level1Nodes = new ArrayList<Integer>();
    level1Nodes.add(2);
    level1Nodes.add(5);
    level1Nodes.add(8);
    // start threads
    for (Integer num : level1Nodes) {
        executor.submit(new Level1(num));
    }
}

private static class Notification {
    private final Object result;
    private final Exception rootException;

    public Notification(Object result, Exception rootException) {
        this.result = result;
        this.rootException = rootException;
    }

    public Object getResult() {
        return result;
    }

    public Exception getRootException() {
        return rootException;
    }
}

private static abstract class NotificationHandler {
    private final AtomicInteger expectedNotifications;
    private final List<Notification> notifications;

    public NotificationHandler(int expectedNotifications) {
        this.expectedNotifications = new AtomicInteger(expectedNotifications);
        this.notifications = new ArrayList<Notification>();
    }

    public void handleNotification(Notification notification) {
        notifications.add(notification);
        if (expectedNotifications.decrementAndGet() == 0) {
            postRun(notifications);
        }
    }

    public void postRun(List<Notification> notifications) {
        for (Notification notification : notifications) {
            System.out.println("Status: " + (notification.getRootException() == null ? "Failed" : "Success") + ", Result: " + (notification.getResult() != null ? notification.getResult() : "No result"));
        }
    }
}

private static class Level1 extends NotificationHandler implements Runnable {
    private final int num;

    public Level1(int num) {
        super(num);
        this.num = num;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 2
        }
    }
}

private static class Level2 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level2(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 3
        }
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}

private static class Level3 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level3(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}
}

此处通知是从Level3 -> Level2' ->Level1'传递的。每个子任务都有责任在完成自己的工作后通知父级。所有子任务通知后,父任务将执行运行后操作并通知其父任务。

这里,线程池执行器使用哪些线程并不重要。唯一重要的是遵循每个子任务通知父级的规则,然后父级执行后期操作并进一步通知其父级。

Notification 类由 resultrootException 组成,可以从子任务中设置它们,以便父级可以知道子任务中出了什么问题并异常可以上升到顶层。

关于java - 以分层方式创建多级线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31026383/

相关文章:

java - 将工作发送到需要初始化的工作池

Java泛型和基类

java - Java中如何将char类型转换为boolean类型?

java - 两个互相隔离的顶点实例

Java 将具有同步块(synchronized block)的线程yield() 转移给也具有同步块(synchronized block)的较高优先级线程。

java - 关于线程的wait()/notify

java - 使用参数重定向 (POST) 到外部 URL 并在 JSP 上获取其响应

java - ExecutorService 正在获取仍在构建中的可运行对象

java - 如何在java中取消一个ExecutorService

java - 在 Java 中使用 Executor 编写文件