Java线程被杀死

标签 java multithreading thread-safety

我有一个java代码(非常古老的遗留代码),它创建了许多线程。 当进程运行时,一些线程被杀死而没有任何痕迹。 看了代码,感觉异常处理很到位。但我不确定为什么有些线程被杀死。这个论坛中的任何专家都可以帮助我这个类是否缺少有关线程处理的任何内容吗?

代码如下:

public class WorkerGroup {

  // This value shows the number of all none-idleling workers.
  private volatile int m_numbersOfActiveWorkingWorkers = 0;
  // This value is set by when a single worker wants to starve multi workers
  private volatile int m_starveRequests = 0;
  // Thread group
  // Java Collection of all workers.
  protected static ThreadGroup s_workGroup = null;
  // Queue manager
  // Work group name
  private final String m_workGroupName;
  // Shut Down process which is hooked onto termination process of the
  // application.
  private static FtpWorkerShutdownHook s_shutDownHook = null;
  private final Log m_log = Log.create(WorkerGroup.class);
  /**
   * CTOR WorkerGroup and collector of workers
   * @param workGroupName will name the workGroup
   * @param manager Queue manager
   */
  public WorkerGroup(final String workGroupName) {
    super();
    m_workGroupName = workGroupName;
    if (s_shutDownHook == null) {
      s_shutDownHook = new FtpWorkerShutdownHook();
      Runtime.getRuntime().addShutdownHook(s_shutDownHook);
    }
  }
  /**
   * Start procedure to start all workers. This function can only be called
   * once.
   * @param numberOfWorkers The number of available workers for this group
   */
  public void startWorkers(int numberOfWorkers) {
    // can not use negative count
    if (numberOfWorkers < 0) {
      return;
    }
    // workgroup is assigned alread. Run once only.
    if (s_workGroup != null) {
      return;
    }
    // creation of the working group. All workers and the work group are
    // set to be deamon threads. This will keep the main process alive until
    // all workers are terminated.
    s_workGroup = new ThreadGroup("Workers of " + m_workGroupName);
    s_workGroup.setDaemon(true);

    // maximize number of workers
    if (numberOfWorkers > 16) {
      numberOfWorkers = 16;
    }
    // create and start all workers.
    for (int i = 0; i < numberOfWorkers; i++) {
      startWorker("FtpWorker" + String.valueOf(i + 1));
    }
  }

//  public void startWorker(final String workerName, final ITaskActions task) {
//    final FtpWorker worker = new FtpWorker(workerName, this);
//    worker.setDaemon(true);
//    worker.start();
//  }
  public void startWorker(final String workerName) {
    final FtpWorker worker = new FtpWorker(workerName, this);
    worker.setDaemon(true);
    worker.start();
  }
  /**
   * Explicit shutdown procedure. The normal case is to kill the process
   */
  public void shutDown() {
    final Thread thread = new FtpWorkerShutdownHook();
    thread.start();
    try {
      thread.join();
      m_log.event("ftp work group shutdown thread terminated");
      // LogAgent
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  /**
   * Returns the number of running (non terminated) threads assinged the the
   * thread group
   * @return Number of active threads
   */
  protected int activeCount() {
    if (s_workGroup == null) {
      return 0;
    }
    else {
      synchronized (s_workGroup) {
        final int count = s_workGroup.activeCount();
        // LogAgent.event("work group", "active count" + count + " active "
        // + m_numbersOfActiveWorkingWorkers);
        return count;
      }
    }
  }

   /**
   * This function returns if the worker group is active This function may be
   * overriden.
   * 
   * @return True if there is workers available and/or
   */
  public synchronized boolean runnable() {
    return activeCount() > 0;
  }

  // **************************************************************************
  // Mutex operation
  // **************************************************************************

  private final Mutex m_mutex = new Mutex();

  public synchronized void incrementRunningWorkers() {
    if (m_numbersOfActiveWorkingWorkers == 0) {
      try {
        m_mutex.acquire();
      }
      catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    ++m_numbersOfActiveWorkingWorkers;
  }

  public synchronized void decrementRunningWorkers() {
    --m_numbersOfActiveWorkingWorkers;
    if (m_numbersOfActiveWorkingWorkers == 0) {
      m_mutex.release();
    }
  }

  protected void incrementStarveRequest() {
    synchronized (m_mutex) {
      m_starveRequests++;
    }
  }

  protected void decrementStarveRequest() {
    synchronized (m_mutex) {
      m_starveRequests--;
    }
  }

  public void acquire() {
    try {
      synchronized (m_mutex) {
        m_mutex.acquire();
      }
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void release() {
    synchronized (m_mutex) {
      m_mutex.release();
    }
  }

  // --------------------------------------------------------------------------
  // Basic Test getter methods
  // --------------------------------------------------------------------------

  protected final int getWorkingWorkersCount() {
    return m_numbersOfActiveWorkingWorkers;
  }

  protected final boolean workersIsStarving() {
    return m_starveRequests != 0;
  }
}

Ftpworker 类是:

public class FtpWorker extends Thread{


  // This instance will use log facility for debug and for CIF log.
  private final Log m_log = Log.create(FtpWorker.class);

  // pointer to onwer of this worker
  protected FtpWorkerGroup m_workerGroup = null;

  // setting m_running to false cause worker to terminate
  protected boolean m_running = true;
  private static boolean isRetryList = false;

  // default sleeping time in mSec
  protected int m_workerSleep = 2000;

  protected EAServer m_eaServer = EAServer.createInstance();

  private final NeHandler m_neManager = NeHandler.getHandler();

  protected List<String> fileNotf = new Vector<String>();

  protected List<String> retrylist = Collections.synchronizedList(new ArrayList<String>());

  public FtpWorker(final String name, final FtpWorkerGroup workerGroup) {
    super(FtpWorkerGroup.s_workGroup,name);

  }

  public void run() {
    idle();
    shutDownEvent();
  }

  private String getTask() {
    String notification=null;
    String nename;
    NeInfo ne=null;
    int delim;
    if(retrylist.size()>0){
      m_log.trace("============ RETRY LIST===== " + Thread.currentThread().getName());
      m_log.trace("RETRY  LIST ::" + retrylist )  ;

     if((retrylist.isEmpty())){
       m_log.trace("RETRY LIST IS EMPTY ");

       return null;
     }
      notification= retrylist.remove(0);
      m_log.trace("Thread ::  " + Thread.currentThread().getName() + " scanning list for Notification[RETRY] :: " + notification );
      delim = notification.indexOf("$");
      nename= notification.substring(0, delim);
      try {
        ne = m_neManager.getNetworkElement(nename);
      }catch (NeNotFoundException e1) {
        m_log.critical("Unable to get properties for Network Elemenet "
            + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");      
      }
     if((ne.getValidState()== NeInfo.NE_IS_STOPPED ) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){
       if((retrylist.isEmpty())){
         m_log.trace("RETRY LIST IS EMPTY ");
         FtpWorker.isRetryList = false;
         return null;
       }
       else{
         int size = retrylist.size();
         for(Iterator it =  retrylist.iterator();it.hasNext(); ){
           String test = (String)it.next();
           // m_log.trace("  RETRY LIST  files to be deleted, test is   "+test );
           if(test.contains(ne.getName())){
             //retrylist.remove(test);
             it.remove();
             m_log.trace("file"+ test+ "is deleted from list");
             m_log.trace(" NEW RETRY LIST FILE NOTIFICATION LIST ::" + retrylist ) ; 
           }
         }
        if((retrylist.isEmpty())){
          m_log.trace("RETRY LIST IS EMPTY ");
          FtpWorker.isRetryList = false;
         }
        return null;
       }
      }
      if(!ne.getisInFtp()){
        m_log.trace("FTP Not Set[RETRY] " +notification);
        ne.setisInFtp();
        m_log.trace("FETCH FILE USING NOTIFICATION[RETRY]  " +notification);
        FtpWorker.isRetryList = false;
        m_log.trace("============Returning from RETRY LIST===== " + Thread.currentThread().getName());
        return notification;
      }
      else{
        m_log.trace("FTP Set. ADDING NOTIFICATION BACK TO RETRY QUEUE:::: "  +notification);
        FtpWorker.isRetryList = true;
        retrylist.add(notification);
        m_log.trace("============Returning NULL RETRY LIST===== " + Thread.currentThread().getName());
        return null;
      }
    }
    if(EAServer.fileNotification!=null && (EAServer.fileNotification.size())>0 && !(isRetryList) ){
      m_log.trace("====ORIGINAL FILE NOTIFICATION LIST== " + Thread.currentThread().getName());
      try{

      m_log.trace("ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification + " for Thread " + Thread.currentThread().getName())  ;

      if((EAServer.fileNotification.isEmpty())){
        m_log.trace("ORIGINAL LIST IS EMPTY ");
        return null;
      }


      notification = (String)EAServer.fileNotification.remove(0);

      m_log.trace("Thread ::  " + Thread.currentThread().getName() + " scanning list for Notification[ORIGINAL] :: " + notification );
      delim = notification.indexOf("$");
      nename= notification.substring(0, delim);
      m_log.trace("NOTIFICATION FOUND :::   " +notification);
      try {
        ne = m_neManager.getNetworkElement(nename);
      }catch (NeNotFoundException e1) {
        m_log.critical("Unable to get properties for Network Elemenet "
            + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");      
      }
      }catch(Exception e){
        m_log.trace("Exception caught while reading fileNotification " + e.getMessage() + " for Thread  " + Thread.currentThread().getName());
        return null;
      }
      if((ne.getValidState()== NeInfo.NE_IS_STOPPED ) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){
        try {
          ne.setisNotInFtp();
          ne.deleteFileWithoutCollecting(ne.getFilePath());
          ne.setValidState(NeInfo.NE_IS_STOPPED);
        }
        catch (CDMException e) {
          m_log.trace("Could not delete file for   " + ne.getName()
              + "  because  " + e.getMessage());
        }
        if((EAServer.fileNotification.isEmpty())){
          m_log.trace("ORIGINAL LIST IS EMPTY NOW");
          return null;
        }
        else{
       try {
         m_log.trace(" DELETING for NE Since Node is Stopped "+ ne.getName());
         int size = EAServer.fileNotification.size();
         m_log.trace("Size of ORIGINAL NOTIFICATION LIST " + size);
         for(Iterator it =  EAServer.fileNotification.iterator();it.hasNext(); ){
           String test = (String)it.next();
           if(test.contains(ne.getName())){
             it.remove();
             m_log.trace("file"+ test+ "is deleted from list");
             m_log.trace(" NEW ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification ) ; 
           }
         }
        return null;
        }
         catch(Exception e){
          m_log.trace(" CCCCCC IN exception" + e.getMessage());
          return null;
        }
        }


      }
      if(!ne.getisInFtp()){
      /*  if((ne.getValidState() == NeInfo.NE_IS_STOPPED) || (ne.getValidState() == NeInfo.NE_IS_STOPPING)){
          return null;
        }*/
        m_log.trace("FTP Not Set[ORIGINAL] " +notification);
        ne.setisInFtp();
     /*  //Remove
        try{
          m_log.trace("Thread is going to sleep");
        Thread.sleep(120000);
        } 
        catch(Exception e){
         //Do nothing
        }
        //Remve end */
        m_log.trace("FETCH FILE USING NOTIFICATION[ORIGINAL]  " +notification);
        m_log.trace("====Returning from ORIGINAL LIST===== " + Thread.currentThread().getName());
        return notification;
      }
      else{
        m_log.trace("FTP Set.ADDING NOTIFICATION BACK TO RETRY QUEUE:::: ::: "+notification);
//        EAServer.fileNotification.add(notification);
        FtpWorker.isRetryList = true;
        retrylist.add(notification);
        m_log.trace("===+Returning NULL from ORIGINAL LIST===== " + Thread.currentThread().getName());
        return null;
      }
    }

    return null;
  }

  private void idle() {
    while (m_running) {
     // m_log.trace("ftp wroker" + Thread.currentThread().getName() + " is active");
      final String task = getTask();
      if (task != null ) {
        // if a task is found increment working threads and run work as long
        // there exists tasks.
        work(task);
      }
   /*   else{
       m_log.trace("task is null which means no file notifications received yet");
      }*/
      try {
        //m_log.trace("No TASK.SLEEPING" + Thread.currentThread().getName());
        sleep(m_workerSleep);
      }
      catch (InterruptedException e) {
        interruptEvent(e);
      }
    }
  }

  private void work(final String t) {
    String task = t;
    do {
      m_eaServer.execute(task);
      m_log.trace("TASK  EXECUTED ::::  " + task + " by Thread  " + Thread.currentThread().getName());
      yield();
      task = getTask();

    }
    while ((task != null) && m_running);
  }

  public void shutDown() {
    m_running = false;
  }

  /**
   * Generates and logs an event. Should be called when the the worker
   * shutdowns.
   */
  protected void shutDownEvent() {
    m_log.event("worker " + super.getName() + " has stopped.",
        LogEventType.KERNEL_WORKER_EVENT);
  }

  /**
   * Generates and logs an event from the given Exception. Should be called when
   * the worker has been interupted.
   * 
   * @param e the exception.
   */
  protected void interruptEvent(final Exception e) {
    m_log.event("worker " + super.getName() + " has interrupted, cause by "
        + e.getMessage() + ".", LogEventType.KERNEL_WORKER_EVENT);
  }


}

最佳答案

如果线程在没有记录关闭事件的情况下退出,那么我会假设有什么东西抛出了运行时异常。对于初学者,我会尝试以下操作来查看抛出的内容:

public void run() {
  try {
    idle();
    shutDownEvent();
  } catch (Throwable e) {
    m_log.critical(e.getMessage();
  }
}

关于Java线程被杀死,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22346214/

相关文章:

java - 如何在 java 中获取代理对象的底层类型?

java - Eclipse 包资源管理器中的文件名旁边的 >(大于括号)是什么意思?

Firebase 实时数据库中的 Java HTTP 调用用于软件中的数据更改

python - 为什么这个http线程池死了(加入),但仍然正常工作?

c++ - 赋值运算符 '=' 是原子的吗?

java - .class 在 Java 中定义在哪里? (它是一个变量还是什么?)

multithreading - 在线程中复制文件

java - Java中使用RocksDB执行并行写入时出现 "Failed to create lock"异常

java - 使用线程打印奇偶数

java - 我需要一个线程安全的数据结构。我应该用什么?