Java 并发 - 管理线程生命周期(启动/停止)的更好设计方法

标签 java concurrency

我正在设计一个并发 Java 应用程序,该应用程序从医院 Intranet 上提供的各种医疗设备读取数据。

我已经阅读了“Java 并发实践 - Brian Goetz...”来了解如何做事,但我认为我仍然缺少一些东西。

这是一个quick simple diagram我正在尝试做的事情,下面有一些代码片段..

工作线程(MedicalDeviceData 实例)不断从医疗设备读取数据,并将其提供给 MedicalDeviceWorkManager,而 MedicalDeviceWorkManager 又将其提供给最终用户。
工作线程继续无限地读取数据(理想情况下),并且在我的场景中不存在“工作已完成”的情况。 此外,用户可以根据需要选择启动所有设备或启动特定设备或停止设备。

下面是我如何实现它的代码片段(已编译但未测试)。

MedicalDeviceWorkManager - 生成工作线程并管理它们。

MedicalDeviceData - 工作线程无限地从医疗设备获取数据并更新此类的实例。

主要看startDevice、stopDevice和run方法。

您显然会注意到,我没有使用 ThreadPoolExecutor 和 Future,并且我只是在这里推出了自己的实现。

由于 future.get 会阻塞直到工作完成,这对我的情况没有意义,因为我的工作线程永远不会“完成”任务......它只是一个无限持续的任务......

问题:如何将下面所示的实现更改为更标准化的实现,以便我可以更好地使用 java.util.concurrent 包(ThreadPoolExecutor/Future)。

我应该考虑其他更好的设计模式吗?

public class MedicalDeviceWorkManager {

  private ThreadGroup rootThreadGroup = null;
  Hashtable<String, MedicalDeviceData> deviceObjs = new Hashtable<String, MedicalDeviceData>();

  public void manageMedicalDevices() throws InterruptedException  {

    String[] allDevices={"Device1","Device2","Device3","Device4"};

    //-- Start all threads to collect data
    for(String deviceToStart:allDevices){
      this.startDevice(deviceToStart);
    }

    //-- Stop all threads 
    for(String deviceToStop:allDevices){
      this.stopDevice(deviceToStop);
    }

    //-- Start on request from user
    String deviceToStart="Device1";
    this.startDevice(deviceToStart);

    //-- Stop on request from user.
    String deviceToStop="Device1";
    this.stopDevice(deviceToStop);

    /* 
     * Get Data and give it to client 
     * This is happening via a separate TCP port
     * */
    while(true){
      for(String deviceName:allDevices){
        if(deviceObjs.get(deviceName)!=null){

          ConcurrentHashMap<String,BigDecimal> devData=deviceObjs.get(deviceName).getCollectedData();

          //--Loop and send data to client on TCP stream
          ;
        }
      }//-- loop the devices
    }//-- infinite
  }

  //-- Start the device to start acquiring data using a worker thread
  private void startDevice(String deviceName){
    //-- Get Device instance
    MedicalDeviceData thisDevice=deviceObjs.get(deviceName);
    if(thisDevice==null){
      thisDevice=new MedicalDeviceData(deviceName);
      deviceObjs.put(deviceName, thisDevice);
    }

    //-- Create thread to start data acquisition 
    //-- Start if not being processed already (Handle what if thread hung scenario later)
    if(this.getThread(deviceName)==null){
      Thread t=new Thread(thisDevice);
      t.setName(deviceName);
      t.start();          
    }
  }


  //-- Stop the worker thread thats collecting the data.
  private void stopDevice(String deviceName) throws InterruptedException {
    deviceObjs.get(deviceName).setShutdownRequested(true);
    Thread t=this.getThread(deviceName);
    t.interrupt();
    t.join(1000);
  }

  private Thread getThread( final String name ) {
    if ( name == null )
        throw new NullPointerException( "Null name" );
    final Thread[] threads = getAllThreads( );
    for ( Thread thread : threads )
        if ( thread.getName( ).equals( name ) )
            return thread;
    return null;
  }

  private ThreadGroup getRootThreadGroup( ) {
      if ( rootThreadGroup != null )
          return rootThreadGroup;
      ThreadGroup tg = Thread.currentThread( ).getThreadGroup( );
      ThreadGroup ptg;
      while ( (ptg = tg.getParent( )) != null )
          tg = ptg;
      return tg;
  } 

  private Thread[] getAllThreads( ) {
    final ThreadGroup root = getRootThreadGroup( );
    final ThreadMXBean thbean = ManagementFactory.getThreadMXBean( );
    int nAlloc = thbean.getThreadCount( );
    int n = 0;
    Thread[] threads;
    do {
        nAlloc *= 2;
        threads = new Thread[ nAlloc ];
        n = root.enumerate( threads, true );
    } while ( n == nAlloc );
    return java.util.Arrays.copyOf( threads, n );
  }

}//-- MedicalDeviceWorkManager




public class MedicalDeviceData implements Runnable{

  //-- Data Collected from medical device
  private final ConcurrentHashMap<String,BigDecimal> collectedData=new ConcurrentHashMap<String,BigDecimal>();

  //-- Set by Thread Manager to request a shutdown..after which it should interrupt the thread
  private AtomicBoolean shutdownRequested;

  //-- Simple data Counter
  private AtomicInteger dataCounter=new AtomicInteger(0);

  //-- Device Name
  private String thisDeviceName;

  public void run() {

    //-- Initialize I/O for the device
    ;

    while(!this.getShutdownRequested()){
      try{
        //-- just to compile the code
        Thread.sleep(0);

        //-- perform I/O operation to get data from medical device
        ;

        //-- Add data into the ConcurrentHashMap...Both key and value are immutable.
        collectedData.put("DataKey", new BigDecimal("9999"));

        //-- data counter
        dataCounter.getAndIncrement();

      }
      catch(InterruptedException ie){
        if(this.getShutdownRequested()){
          return;
        }
        //throw new InterruptedException();
      }
    }

  }//-- run

  public MedicalDeviceData(String thisDeviceName){
    this.thisDeviceName=thisDeviceName;
  }

  /**
   * @return the shutdownRequested
   */
  public boolean getShutdownRequested() {
    return this.shutdownRequested.get();
  }


  /**
   * @param shutdownRequested the shutdownRequested to set
   */
  public void setShutdownRequested(boolean shutdownRequested) {
    this.shutdownRequested.set(shutdownRequested);
  }


  /**
   * Both key and value are immutable, so ok to publish reference.
   * 
   * @return the collectedData
   */
  public ConcurrentHashMap<String, BigDecimal> getCollectedData() {
    return collectedData;
  }


  /**
   * @return the dataCounter
   */
  public AtomicInteger getDataCounter() {
    return dataCounter;
  }

}

最佳答案

因此,线程池可能是也可能不是最好的用法,因为没有真正的工作抽象。然而,这可能是扩展线程的一个有趣的用例。我将使用简单的 j.u.c.Lock 和 j.u.c.Condition 来处理通信。最后,我会将停止和启动委托(delegate)给接受 Runnable 类型作为单独的常量工作单元的特定委托(delegate)类。

我想要的唯一区别是使用 stop 来关闭并使用暂停来暂停,启动将继续工作。例如,它可能看起来像这样。

public class MedicalDeviceWorkManager {

   private ConcurrentHashMap<String, DelegatingThread> devices = new ConcurrentHashMap<...>();

   public synchronized void registerDevice(String device, Runnable singleUnitOfWork){
         DelegatingThread worker = new DelegatingThread(singleUnitOfWork);
         devices.put(device,worker);

         worker.start();

   } 
   public void startDevice(String device){
        devices.get(device).startDevice();
   }    
   public void stopDevice(String device){
        devices.get(device).stopDevice();
   }    
   public void pauseDevice(String device){
        devices.get(device).pauseDevice();
   }    

   private static class DelegatingThread extends Thread{
      final Lock lock = new ReentrantLock();
      final Condition condition = lock.newCondition();
      final Runnable r;
      boolean paused = true;
      boolean stopped =false;
      private DelegatingThread (Runnable r){ this.r = r; }
      @Override
      public void run(){
          while(true){

             lock.lock(); 
             while(paused)
              condition.await(); 

             if(stopped) return;

             lock.unlock();

             r.run();
          }
      }
      private void startDevice(){
            lock.lock(); 
            paused = false;
            condition.signal(); 
            lock.unlock();
      }
      private void pauseDevice(){
            lock.lock(); 
            if(!stopped)
               paused = true;
            lock.unlock();
      }
      private void stopDevice(){
            lock.lock(); 
            stopped= true;
            paused=false;
            condition.signal();
            lock.unlock();
      }
   } 
}

我知道要演示的代码很多,但线程池背后的原理相同。您将始终为每个设备创建一个线程,并在必要时重用。可运行程序将是您在 while 循环之间运行的单个函数。

此外,为了简洁起见,我没有包含 try/finally 语义。

关于Java 并发 - 管理线程生命周期(启动/停止)的更好设计方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6286702/

相关文章:

java - 如何在 GWT 中取消转义字符串

java - Sybase ASE 到 HSQLDB JUnit java.sql.SQLSyntaxErrorException : type not found or user lacks privilege

java - tomcat 日志存储在一个单独的文件中

java - Java 在 32 位机器上可以创建的线程数

java - 同步到要实例化的对象

java - 同步与锁定

java - HttpJUnitRunner 无法解析为类型

java - JavaEE EJB/Web 容器中的线程创建

java - 寻找多线程死锁的原因?

java - 试图了解 ConcurrentHashMap 的范围